diff --git a/CHANGELOG.md b/CHANGELOG.md index 3b6ad42..4d24d40 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,17 +9,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed -Added two new properties to the `TkmsMessage`: +Added two new methods to the `TkmsMessage` that allow to conveniently use standard uuid and priority headers: -- `uuid` - specifies uniqueness of the message. Will be delivered to consumers in the `x-wise-uuid` header. -- `priority` - specifies priority of the message. Lower number - higher priority. Will be delivered to consumers in the `x-wise-uuid` header. +- `x-wise-uuid` - defines uniqueness of the message. +- `x-wise-priority` - defines priority of the message. Lower number - higher priority. Consumers of messages that have UUID and priority headers can efficiently use provided values for deduplication and other processing purposes with no need to deserialize payloads. -Best practices for setting UUID value: +Best practices for setting UUID header value: - Likely the UUID value provided will be stored and indexed on consumer side. It's recommended to use sequential UUIDs in such scenarios, which proved to yield better performance. One way to generate sequential UUIDs is by using [tw-base-utils](https://github.com/transferwise/tw-base-utils/blob/master/tw-base-utils/src/main/java/com/transferwise/common/baseutils/UuidUtils.java#L37) library. -- If payload already has UUID value then assign the same value to the corresponding `TkmsMessage`. It ensures that consumers of such messages can consistently deduplicate them by depending on one of those UUIDs. It simplifies consumers migration to standard header based UUID deduplication. -- If custom message identification mechanism is used (not based on UUID), still generate and assign UUID to the messages. However, be mindful of cases when messages are sent in non-transactional environments. For example, the same message might be sent twice with different UUIDs but the same identity (according to the custom identification mechanism). +- If payload already has UUID value then set the same value in header. It ensures that consumers of such messages can consistently deduplicate them by depending on one of those UUIDs. It simplifies consumers migration to standard header based UUID deduplication. +- If custom message identification mechanism is used (not based on UUID), still generate and add UUID to the headers. However, be mindful of cases when messages are sent in non-transactional environments. For example, the same message might be sent twice with different UUIDs but the same identity (according to the custom identification mechanism). ## [0.30.1] - 2024-08-08 ### Changed diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/StandardHeaders.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/StandardHeaders.java deleted file mode 100644 index 0ba90ed..0000000 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/StandardHeaders.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.transferwise.kafka.tkms; - -/** - * Set of standard headers used by the library. - */ -final class StandardHeaders { - - static final String X_WISE_UUID = "x-wise-uuid"; - static final String X_WISE_PRIORITY = "x-wise-priority"; -} diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java index 37ec532..5c84134 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java @@ -19,7 +19,6 @@ import com.transferwise.kafka.tkms.config.TkmsProperties.NotificationLevel; import com.transferwise.kafka.tkms.config.TkmsProperties.NotificationType; import com.transferwise.kafka.tkms.metrics.ITkmsMetricsTemplate; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -149,11 +148,7 @@ protected void checkActiveTransaction(int shard, boolean transactionActive, bool @Override public SendMessagesResult sendMessages(SendMessagesRequest request) { request.getTkmsMessages().forEach(message -> messageDecorators.forEach(message::accept)); - for (int i = 0; i < request.getTkmsMessages().size(); i++) { - TkmsMessage tkmsMessage = request.getTkmsMessages().get(i); - addStandardHeaders(tkmsMessage); - validateMessage(tkmsMessage, i); - } + validateMessages(request); var transactionActive = TransactionSynchronizationManager.isActualTransactionActive(); var validatedTopics = new HashSet(); @@ -268,7 +263,6 @@ public SendMessageResult sendMessage(SendMessageRequest request) { checkActiveTransaction(shardPartition.getShard(), transactionActive, deferMessageRegistrationUntilCommit); - addStandardHeaders(request.getTkmsMessage()); validateMessage(message, 0); validateMessageSize(message, 0); @@ -380,6 +374,13 @@ public void afterCompletion(int status) { } } + protected void validateMessages(SendMessagesRequest request) { + for (int i = 0; i < request.getTkmsMessages().size(); i++) { + var tkmsMessage = request.getTkmsMessages().get(i); + validateMessage(tkmsMessage, i); + } + } + protected void validateMessage(TkmsMessage message, int messageIdx) { Preconditions.checkNotNull(message, "%s: No message provided.", messageIdx); Preconditions.checkArgument(!Strings.isNullOrEmpty(message.getTopic()), "%s: No topic provided.", messageIdx); @@ -433,24 +434,6 @@ protected void validateMessageSize(TkmsMessage message, int messageIdx) { } } - private static void addStandardHeaders(TkmsMessage tkmsMessage) { - if (tkmsMessage.getPriority() != null) { - tkmsMessage.addHeader( - new Header() - .setKey(StandardHeaders.X_WISE_PRIORITY) - .setValue(tkmsMessage.getPriority().toString().getBytes(StandardCharsets.UTF_8)) - ); - } - // uuid shall remain last header, so it can be quickly accessed using Headers#lastHeader - if (tkmsMessage.getUuid() != null) { - tkmsMessage.addHeader( - new Header() - .setKey(StandardHeaders.X_WISE_UUID) - .setValue(tkmsMessage.getUuid().toString().getBytes(StandardCharsets.UTF_8)) - ); - } - } - private int utf8Length(CharSequence s) { if (s == null) { return 0; diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsMessage.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsMessage.java index 98e2dc9..4b66cba 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsMessage.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsMessage.java @@ -3,6 +3,7 @@ import com.transferwise.common.baseutils.UuidUtils; import com.transferwise.kafka.tkms.CompressionAlgorithm; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.ArrayList; import java.util.List; @@ -74,29 +75,37 @@ public class TkmsMessage { private Map metadata; /** - * Uniquely identifies this message for consumers. + * Adds {@code x-wise-uuid} header to the message, which uniquely identifies this message for consumers. * - *

If value is set then it will be added to headers with {@code x-wise-uuid} key. - * - *

Having UUID in header allows consumer to run deduplication check on this value without need to deserialize payload. + *

Having UUID in header allows consumers to run deduplication check on this value without need to deserialize payload. * If payload provides uuid it must be the same as this value so that consumers that depend on either of these values can have consistent * deduplication. * *

Prefer using sequential uuids (e.g. {@link UuidUtils#generatePrefixCombUuid()}) which are proved to yield better performance. */ - private UUID uuid; + public TkmsMessage addUuidHeader(UUID uuid) { + return addHeader( + new Header() + .setKey("x-wise-uuid") + .setValue(uuid.toString().getBytes(StandardCharsets.UTF_8)) + ); + } /** - * Defines priority of this message for consumers. + * Adds {@code x-wise-priority} header to the message, which defines priority of this message for consumers. * *

Lower value means higher priority. For example, 0 is higher priority than 10. * - *

If value is set then it will be added to headers with {@code x-wise-priority} key. - * - *

Having priority in header allows consumer to derive priority without need to deserialize payload. For example, it can be useful + *

Having priority in header allows consumers to derive priority without need to deserialize payload. For example, it can be useful * when consumers filter messages based on priority before deciding how to process those. */ - private Long priority; + public TkmsMessage addPriorityHeader(long priority) { + return addHeader( + new Header() + .setKey("x-wise-priority") + .setValue(Long.toString(priority).getBytes(StandardCharsets.UTF_8)) + ); + } public TkmsMessage addHeader(Header header) { if (headers == null) { diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java index b0db4f4..51dc59d 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java @@ -127,11 +127,11 @@ void testThatJsonStringMessageCanBeSentAndRetrieved(boolean deferUntilCommit) { assertThat(new String(criticalityHeader.value(), StandardCharsets.UTF_8)).isEqualTo("PrettyLowLol"); org.apache.kafka.common.header.Header priorityHeader = cr.headers().toArray()[1]; - assertThat(priorityHeader.key()).isEqualTo(StandardHeaders.X_WISE_PRIORITY); + assertThat(priorityHeader.key()).isEqualTo("x-wise-priority"); assertThat(Long.parseLong(new String(priorityHeader.value(), StandardCharsets.UTF_8))).isEqualTo(priority); org.apache.kafka.common.header.Header uuidHeader = cr.headers().toArray()[2]; - assertThat(uuidHeader.key()).isEqualTo(StandardHeaders.X_WISE_UUID); + assertThat(uuidHeader.key()).isEqualTo("x-wise-uuid"); assertThat(UUID.fromString(new String(uuidHeader.value(), StandardCharsets.UTF_8))).isEqualTo(uuid); receivedCount.incrementAndGet(); @@ -149,8 +149,6 @@ void testThatJsonStringMessageCanBeSentAndRetrieved(boolean deferUntilCommit) { transactionsHelper.withTransaction().run(() -> { var result = transactionalKafkaMessageSender.sendMessage( new TkmsMessage() - .setUuid(uuid) - .setPriority(priority) .setTopic(testProperties.getTestTopic()) .setValue(toJsonBytes(testEvent)) .addHeader( @@ -158,6 +156,8 @@ void testThatJsonStringMessageCanBeSentAndRetrieved(boolean deferUntilCommit) { .setKey("x-tw-criticality") .setValue("PrettyLowLol".getBytes(StandardCharsets.UTF_8)) ) + .addPriorityHeader(priority) + .addUuidHeader(uuid) ); var messagesCount = tkmsTestDao.getMessagesCount(result.getShardPartition());