diff --git a/src/manual/endpoint-kafka.adoc b/src/manual/endpoint-kafka.adoc index 60daafd48e..b190b0a3c3 100644 --- a/src/manual/endpoint-kafka.adoc +++ b/src/manual/endpoint-kafka.adoc @@ -1,5 +1,5 @@ [[kafka]] -= Apache Kafka support += Apache Kafka Support Kafka is a distributed streaming platform that enables you to publish and subscribe to streams of records, similar to a message queue or enterprise messaging system. Citrus provides support for publishing/consuming records to/from a Kafka topic. @@ -44,7 +44,7 @@ Now you are able to use customized Citrus XML elements in order to define the Ka using the Spring Java configuration with `@Bean` annotations you do not require this step. [[kafka-endpoint]] -== Kafka endpoint +== Kafka Endpoint By default, Citrus Kafka endpoints are asynchronous. Asynchronous messaging means that the endpoint will not wait for any response message after sending or receiving a message. @@ -103,20 +103,22 @@ public KafkaEndpoint helloKafkaEndpoint() { consumer-group="citrus_group"/> ---- -The endpoint is now ready to be used inside a test case. The test simply references the endpoint by its name when sending -or receiving. +Note that for effective message consumption it is advisable to use random consumer groups. +Both the Java and XML DSL support random consumer groups, when enabled: `randomConsumerGroup(true)` or `random-consumer-group="true"`. -In case of a send operation the endpoint creates a Kafka producer and will simply publish the records to the defined Kafka -topic. As the communication is asynchronous by default producer does not wait for a synchronous response. +The endpoint is now ready to be used inside a test case. +The test simply references the endpoint by its name when sending or receiving. -In case of a receive operation the endpoint creates a Kafka consumer instance in the defined consumer group. The consumer -starts a subscription on the given topic and acts as a polling listener. This means that the message consumer connects -to the given topic and polls for records. As soon as a record has been received the action is ready to perform the validation -process that verifies the record. +In case of a send operation the endpoint creates a Kafka producer and will simply publish the records to the defined Kafka topic. +As the communication is asynchronous by default, the producer does not wait for a response. -NOTE: By default, the consumer polls for a single record per receive operation (`max.poll.records=1`). You can change this -setting within the consumer properties on the Citrus Kafka endpoint. The property `max.poll.records` that sets the number -of records in a single poll. +In case of a receive operation the endpoint creates a Kafka consumer instance in the defined (possibly random) consumer group. + +NOTE: By default, the consumer starts a subscription on the given topic and acts as a polling listener. +This means that the message consumer connects to the given topic and polls one single record at a time. +The action is the ready to perform the validation process that verifies the record, once it has been received. +This approach requires looping to find specific Kafka messages, which can be highly inefficient, especially when the Kafka topic experiences high traffic. +See <> for a more effective Kafka message selection. [[kafka-endpoint-configuration]] === Configuration @@ -127,114 +129,124 @@ The following table shows all available settings on a Kafka endpoint in Citrus: |=== | Property | Mandatory | Default | Description -| id +| `id` | Yes | - | Identifying name of the endpoint. Only required for XML configuration. -| topic +| `topic` | No | - | Default topic to use with this endpoint. Multiple topics are supported by using a comma delimited list of names (e.g. `topic1,topic2,topicN`). If not specified the test case send operation needs to set the topic as message header information. -| server +| `server` | No -| localhost:9092 -| A comma delimited list of host/port pairs to use for establishing the initial connection to the Kafka cluster. Usually - it is only required to connect to one Kafka server instance in the cluster. Kafka then makes sure that the endpoint is - automatically introduced to all other servers in the cluster. This list only impacts the initial hosts used to discover - the full set of servers. +| `localhost:9092` +| A comma delimited list of host/port pairs to use for establishing the initial connection to the Kafka cluster. + Usually it is only required to connect to one Kafka server instance in the cluster. + Kafka then makes sure that the endpoint is automatically introduced to all other servers in the cluster. + This list only impacts the initial hosts used to discover the full set of servers. -| timeout +| `timeout` | No -| 5000 -| Timeout in milliseconds. For producers the timeout is set as time to wait for the message to be accepted by the cluster. +| `5000` +| Timeout in milliseconds. + For producers the timeout is set as time to wait for the message to be accepted by the cluster. For consumers the timeout is used for polling records on a specific topic. -| message-converter +| `message-converter` | No | `org.citrusframework.kafka.message.KafkaMessageConverter` -| Converter maps internal Citrus message objects to ProducerRecord/ConsumerRecord objects. The converter implementation takes - care of message key, value, timestamp and special message headers. +| Converter maps internal Citrus message objects to `ProducerRecord`/`ConsumerRecord` objects. + The converter implementation takes care of message key, value, timestamp and special message headers. -| header-mapper +| `header-mapper` | No | `org.citrusframework.kafka.message.KafkaMessageHeaderMapper` -| Header mapper maps Kafka record information (e.g. topic name, timestamp, message key) to internal message headers - (`org.citrusframework.kafka.message.KafkaMessageHeaders`) and vice versa. +| Header mapper maps Kafka record information (e.g. topic name, timestamp, message key) to internal message headers (`org.citrusframework.kafka.message.KafkaMessageHeaders`) and vice versa. -| auto-commit +| `auto-commit` | No -| true -| When this setting is enabled the consumer will automatically commit consumed records so the offset pointer on the Kafka - topic is set to the next record. +| `true` +| When this setting is enabled the consumer will automatically commit consumed records so the offset pointer on the Kafka topic is set to the next record. -| auto-commit-interval +| `auto-commit-interval` | No -| 1000 +| `1000` | Interval in milliseconds the auto commit operation on consumed records is performed. | offset-reset | No -| earliest -| When consuming records from a topic partition and the current offset does not exist on that partition Kafka will automatically - seek to a valid offset position on that partition. The `offset-reset` setting sets where to find the new position (latest, earliest, - none). If `none` is set the consumer will receive an exception instead of resetting the offset to a valid position. +| `earliest` +| When consuming records from a topic partition and the current offset does not exist on that partition Kafka will automatically seek to a valid offset position on that partition. + The `offset-reset` setting sets where to find the new position (`latest`, `earliest`, `none`). + If `none` is set the consumer will receive an exception instead of resetting the offset to a valid position. -| partition +| `partition` | No -| 0 +| `0` | Partition id that the consumer will be assigned to. -| consumer-group +| `consumer-group` +| No +| `citrus_kafka_group` +| Consumer group name. +Please keep in mind that records are load balanced across consumer instances with the same consumer group name set. +So you might run into message timeouts when using multiple Kafka endpoints with the same consumer group name. + +| `random-consumer-group` | No -| citrus_kafka_group -| Consumer group name. Please keep in mind that records are load balanced across consumer instances with the same consumer - group name set. So you might run into message timeouts when using multiple Kafka endpoints with the same consumer group name. +| `false` +| Whether to use random consumer gorup names. + Note that these will all be prefixed by `citrus_kafka_` and end with a random 10 characters alphabetic suffix. -| key-serializer +| `key-serializer` | No -| org.apache.kafka.common.serialization.StringSerializer -| Serializer implementation that converts message key values. By default keys are serialized to String values. +| `org.apache.kafka.common.serialization.StringSerializer` +| Serializer implementation that converts message key values. + By default, keys are serialized to String values. -| key-deserializer +| `key-deserializer` | No -| org.apache.kafka.common.serialization.StringDeserializer -| Deserializer implementation that converts message key values. By default keys are deserialized as String values. +| `org.apache.kafka.common.serialization.StringDeserializer` +| Deserializer implementation that converts message key values. + By default, keys are deserialized as String values. -| value-serializer +| `value-serializer` | No -| org.apache.kafka.common.serialization.StringSerializer -| Serializer implementation that converts record values. By default values are serialized to String values. +| `org.apache.kafka.common.serialization.StringSerializer` +| Serializer implementation that converts record values. + By default values are serialized to String values. -| value-deserializer +| `value-deserializer` | No -| org.apache.kafka.common.serialization.StringDeserializer -| Deserializer implementation that converts record values. By default values are deserialized as String values. +| `org.apache.kafka.common.serialization.StringDeserializer` +| Deserializer implementation that converts record values. + By default, values are deserialized as String values. -| client-id +| `client-id` | No -| citrus_kafka_[producer/consumer]_{randomUUID} -| An id string to pass to the server when producing/consuming records. Used as logical application name to be included in - server-side request logging. +| `citrus_kafka_[producer/consumer]_{randomUUID}` +| An id string to pass to the server when producing/consuming records. + Used as logical application name to be included in server-side request logging. -| consumer-properties +| `consumer-properties` | No | - -| Map of consumer property settings to apply to the Kafka consumer configuration. This enables you to overwrite any consumer - setting with respective property key value pairs. +| Map of consumer property settings to apply to the Kafka consumer configuration. + This enables you to overwrite any consumer setting with respective property key value pairs. -| producer-properties +| `producer-properties` | No | - -| Map of producer property settings to apply to the Kafka producer configuration. This enables you to overwrite any producer - setting with respective property key value pairs. +| Map of producer property settings to apply to the Kafka producer configuration. + This enables you to overwrite any producer setting with respective property key value pairs. |=== [[kafka-endpoint-properties]] -=== Producer and consumer properties +=== Producer and Consumer Properties The Citrus Kafka endpoint component is also able to receive a map of Kafka producer and consumer properties. These property settings overwrite any predefined setting on the producer/consumer instance created by the endpoint. You can use the Kafka @@ -287,12 +299,12 @@ private Map getConsumerProps() { ---- [[kafka-synchronous-endpoints]] -== Kafka synchronous endpoints +== Kafka Synchronous Endpoints Not implemented yet. [[kafka-message-headers]] -== Kafka message headers +== Kafka Message Headers The Kafka Citrus integration defines a set of special message header entries that are either used to manipulate the endpoint behavior or as validation object. These Kafka specific headers are stored with a header key prefix `citrus_kafka_*`. You @@ -404,7 +416,7 @@ These are the available Kafka message headers in Citrus: |=== [[kafka-message]] -== Kafka message +== Kafka Message Citrus also provides a Kafka message implementation that you can use on any send and receive operation. This enables you to set special message headers in a more comfortable way when using the Java fluent API: @@ -421,8 +433,154 @@ send(helloKafkaEndpoint) The message implementation provides fluent API builder methods for each Kafka specific header. +Additionally, when receiving messages, you might want to use <>. + +[[kafka-message-selector]] +== Kafka Message Selector + +The Kafka Message Selector feature allows you to selectively receive messages from a Kafka topic based on specific criteria. +This powerful functionality enables you to filter Kafka messages by different criteria, e.g. <>. +Additionally, the defined time window for message retrieval significantly improves the performance. +Imagine a large Kafka topic with thousands of events. +Looking through all of these would require an immense amount of resources and time. +Instead, selective message consumption starts at an offset `Ox = OT-n`. +Where `T` is the current timestamp and `n` is the maximum timespan in which the wanted event is expected to have been published. + +=== Basic Usage + +The Kafka Message Selector can be used in various ways, depending on your preferred syntax and test framework. + +.Java +[source,java,indent=0,role="primary"] +---- +then( + receive(kafkaEndpoint) + .selector( + kafkaMessageFilter() + .eventLookbackWindow(Duration.ofSeconds(1L)) + .kafkaMessageSelector(kafkaHeaderEquals("key", "value")) + .build() + ) +); +---- + +.Java 2 +[source,java,indent=0,role="secondary"] +---- +then( + kafkaEndpoint.findKafkaEventHeaderEquals(Duration.ofSeconds(1L), "key", "value") +); +---- + +.XML +[source,xml,indent=0,role="secondary"] +---- + + Receive selective Kafka message + + + + + + +---- + +=== Configuration + +[cols="2,2,2"] +|=== +| Java DSL | XML DSL | Description + +| `eventLookbackWindow` +| `event-lookback-window` +| This defines how far back in time the selector should search for messages. + When using XML configuration, the event lookback window must be specified as an <>. + For example, `PT1S` represents a duration of 1 second. + +| `kafkaMessageSelector` +| See <>. + For example, `PT0.100S` represents a duration of 1 millisecond. + +|=== + +[[kafka-message-selector-types]] +=== Selector Types + +.Message Header + +The framework provides two main types of message header selectors. +From within the Java DSL, these two can be easily invoked using statically provided methods: + +1. `kafkaHeaderEquals`: Matches messages where the specified header `key` exactly equals the given `value`. +2. `kafkaHeaderContains`: Matches messages where the specified header `key` contains the given `value` as a substring. + +More advanced users might want to do pre- or suffix matching. +That is also possible. + +.Java +[source,java,indent=0,role="primary"] +---- +then( + receive(kafkaWithRandomConsumerGroupEndpoint) + .selector( + kafkaMessageFilter() + .eventLookbackWindow(Duration.ofSeconds(1L)) + .kafkaMessageSelector( + KafkaMessageByHeaderSelector.builder() + .key("key") + .value("prefix") + .matchingMechanism(STARTS_WITH) + .build() + ) + .build() + ) +); +---- + +Note that if the specified `key` is `null`, all headers in the record will be matched against the `value`. +If the `value` is `null` however, all headers with the exact `key` match. + +[cols="2,2,2"] +|=== +| Java DSL | XML DSL | Description + +| `key` +| `header-filter-key` +| Key-filter being applied to Kafka messages. + Matches exact if specified, all keys if `null` or empty. + +| `value` +| `header-filter-value` +| Value-filter being applied to Kafka messages. + Matches all values if `null` or empty. + Otherwise matches as specified by strategy. + +| `valueMatchingStrategy` +| `header-filter-comparator` +| Specifies how the `value` is being matched. + Must be one of `EQUALS`, `CONTAINS`, `STARTS_WITH` or `ENDS_WITH`. + It defaults to `EQUALS`, if not specified. + +|=== + +=== Best Practices + +*Set Appropriate Lookback Window:* Choose a lookback window that balances between finding the desired message and performance. +A larger window might find older messages but could impact performance. + +*Combine with Other Citrus Features:* The Kafka Message Selector can be combined with other Citrus testing features for comprehensive Kafka integration testing. + [[dynamic-kafka-endpoints]] -== Dynamic Kafka endpoints +== Dynamic Kafka Endpoints As we have seen before the topic name can be overwritten in each send and receive operation by specifying the `citrus_kafka_topic` message header. In addition to that you can make use of completely dynamic Kafka endpoints, too. @@ -477,7 +635,7 @@ You can add multiple parameters to the endpoint url in order to set properties o about dynamic endpoints in chapter link:#dynamic-endpoint-components[dynamic endpoints]. [[embedded-kafka-server]] -== Embedded Kafka server +== Embedded Kafka Server The Kafka message broker is composed of a Zookeeper server and a Kafka server. Citrus provides an embedded server (*for testing purpose only!*) that is able to be started within your integration test environment. The server cluster is configured with one single Zookeeper @@ -526,7 +684,7 @@ The embedded server component provides following properties to set: | zookeeper-port | java.lang.Integer -| Zookeeper server port. By default a random port is used. +| Zookeeper server port. By default, a random port is used. | broker-properties | java.util.Map