diff --git a/src/manual/endpoint-kafka.adoc b/src/manual/endpoint-kafka.adoc index b190b0a3c3..f174eed471 100644 --- a/src/manual/endpoint-kafka.adoc +++ b/src/manual/endpoint-kafka.adoc @@ -264,11 +264,11 @@ public KafkaEndpoint helloKafkaEndpoint() { } private Map getProducerProps() { - ... + // ... } private Map getConsumerProps() { - ... + // ... } ---- @@ -355,7 +355,7 @@ send(helloKafkaEndpoint) ---- -In case of a receive operation message headers are valuable validation objects that can be used to verify the message content with +In case of a receiving operation message headers are valuable validation objects that can be used to verify the message content with an expected behavior. .Java @@ -446,6 +446,7 @@ Looking through all of these would require an immense amount of resources and ti Instead, selective message consumption starts at an offset `Ox = OT-n`. Where `T` is the current timestamp and `n` is the maximum timespan in which the wanted event is expected to have been published. +[[kafka-message-selector-basic]] === Basic Usage The Kafka Message Selector can be used in various ways, depending on your preferred syntax and test framework. @@ -485,6 +486,7 @@ then( ---- +[[kafka-message-selector-configuration]] === Configuration [cols="2,2,2"] @@ -494,7 +496,7 @@ then( | `eventLookbackWindow` | `event-lookback-window` | This defines how far back in time the selector should search for messages. - When using XML configuration, the event lookback window must be specified as an <>. + When using XML configuration, the event lookback window must be specified as an https://en.wikipedia.org/wiki/ISO_8601[ISO-8601 duration string]. For example, `PT1S` represents a duration of 1 second. | `kafkaMessageSelector` @@ -507,7 +509,7 @@ then( | The timeout duration for each poll operation when consuming messages from Kafka. This value determines how long the consumer will wait for new records in each poll cycle. It is not the overall receive action timeout! - When using XML configuration, the poll timeout must be specified as an <>. + When using XML configuration, the poll timeout must be specified as an https://en.wikipedia.org/wiki/ISO_8601[ISO-8601 duration string]. For example, `PT0.100S` represents a duration of 1 millisecond. |=== @@ -572,6 +574,7 @@ If the `value` is `null` however, all headers with the exact `key` match. |=== +[[kafka-message-selector-best-practices]] === Best Practices *Set Appropriate Lookback Window:* Choose a lookback window that balances between finding the desired message and performance. @@ -638,7 +641,7 @@ about dynamic endpoints in chapter link:#dynamic-endpoint-components[dynamic end == Embedded Kafka Server The Kafka message broker is composed of a Zookeeper server and a Kafka server. Citrus provides an embedded server (*for testing purpose only!*) -that is able to be started within your integration test environment. The server cluster is configured with one single Zookeeper +that is able to start within your integration test environment. The server cluster is configured with one single Zookeeper server and a single Kafka server. You can define server ports and broker properties such as topics, number of partitions and broker ids. Given topics are automatically added via admin client on the Kafka server with given amount of partitions.