Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support sending standardized uuid and priority values #92

Merged
merged 3 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,22 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.31.0] - 2024-10-07

### Changed

Added two new methods to the `TkmsMessage` that allow to conveniently use standard uuid and priority headers:

- `x-wise-uuid` - defines uniqueness of the message.
- `x-wise-priority` - defines priority of the message. Lower number - higher priority.

Consumers of messages that have UUID and priority headers can efficiently use provided values for deduplication and other processing purposes with no need to deserialize payloads.

Best practices for setting UUID header value:
- Likely the UUID value provided will be stored and indexed on consumer side. It's recommended to use sequential UUIDs in such scenarios, which proved to yield better performance. One way to generate sequential UUIDs is by using [tw-base-utils](https://github.com/transferwise/tw-base-utils/blob/master/tw-base-utils/src/main/java/com/transferwise/common/baseutils/UuidUtils.java) library.
- If payload already has UUID value then set the same value in header. It ensures that consumers of such messages can consistently deduplicate them by depending on one of those UUIDs. It simplifies consumers migration to standard header based UUID deduplication.
- If custom message identification mechanism is used (not based on UUID), still generate and add UUID to the headers. However, be mindful of cases when messages are sent in non-transactional environments. For example, the same message might be sent twice with different UUIDs but the same identity (according to the custom identification mechanism).

## [0.30.1] - 2024-08-08
### Changed
- MeterFilter's applied by the library are no longer explicitly applied and are instead
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.30.1
version=0.31.0
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package com.transferwise.kafka.tkms.api;

import com.transferwise.common.baseutils.UuidUtils;
import com.transferwise.kafka.tkms.CompressionAlgorithm;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.PositiveOrZero;
Expand Down Expand Up @@ -71,6 +74,39 @@ public class TkmsMessage {
*/
private Map<?, ?> metadata;

/**
* Adds {@code x-wise-uuid} header to the message, which uniquely identifies this message for consumers.
*
* <p>Having UUID in header allows consumers to run deduplication check on this value without need to deserialize payload.
* If payload provides uuid it must be the same as this value so that consumers that depend on either of these values can have consistent
* deduplication.
*
* <p>Prefer using sequential uuids (e.g. {@link UuidUtils#generatePrefixCombUuid()}) which are proved to yield better performance.
*/
public TkmsMessage addUuidHeader(UUID uuid) {
return addHeader(
new Header()
.setKey("x-wise-uuid")
.setValue(uuid.toString().getBytes(StandardCharsets.UTF_8))
);
}

/**
* Adds {@code x-wise-priority} header to the message, which defines priority of this message for consumers.
*
* <p>Lower value means higher priority. For example, 0 is higher priority than 10.
*
* <p>Having priority in header allows consumers to derive priority without need to deserialize payload. For example, it can be useful
* when consumers filter messages based on priority before deciding how to process those.
*/
public TkmsMessage addPriorityHeader(long priority) {
return addHeader(
new Header()
.setKey("x-wise-priority")
.setValue(Long.toString(priority).getBytes(StandardCharsets.UTF_8))
);
}

public TkmsMessage addHeader(Header header) {
if (headers == null) {
headers = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.transferwise.common.baseutils.ExceptionUtils;
import com.transferwise.common.baseutils.UuidUtils;
import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper;
import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender;
import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender.SendMessageRequest;
Expand All @@ -28,6 +29,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -107,6 +109,8 @@ void testThatJsonStringMessageCanBeSentAndRetrieved(boolean deferUntilCommit) {
for (int i = 0; i < messageMultiplier; i++) {
sb.append(messagePart);
}
var uuid = UuidUtils.generatePrefixCombUuid();
var priority = 17L;

tkmsStorageToKafkaProxy.pause();

Expand All @@ -116,10 +120,20 @@ void testThatJsonStringMessageCanBeSentAndRetrieved(boolean deferUntilCommit) {
Consumer<ConsumerRecord<String, String>> messageCounter = cr -> ExceptionUtils.doUnchecked(() -> {
TestEvent receivedEvent = objectMapper.readValue(cr.value(), TestEvent.class);
if (receivedEvent.getMessage().equals(message)) {
assertThat(cr.headers().toArray()).hasSize(1);
org.apache.kafka.common.header.Header header = cr.headers().toArray()[0];
assertThat(header.key()).isEqualTo("x-tw-criticality");
assertThat(new String(header.value(), StandardCharsets.UTF_8)).isEqualTo("PrettyLowLol");
assertThat(cr.headers().toArray()).hasSize(3);

org.apache.kafka.common.header.Header criticalityHeader = cr.headers().toArray()[0];
assertThat(criticalityHeader.key()).isEqualTo("x-tw-criticality");
assertThat(new String(criticalityHeader.value(), StandardCharsets.UTF_8)).isEqualTo("PrettyLowLol");

org.apache.kafka.common.header.Header priorityHeader = cr.headers().toArray()[1];
assertThat(priorityHeader.key()).isEqualTo("x-wise-priority");
assertThat(Long.parseLong(new String(priorityHeader.value(), StandardCharsets.UTF_8))).isEqualTo(priority);

org.apache.kafka.common.header.Header uuidHeader = cr.headers().toArray()[2];
assertThat(uuidHeader.key()).isEqualTo("x-wise-uuid");
assertThat(UUID.fromString(new String(uuidHeader.value(), StandardCharsets.UTF_8))).isEqualTo(uuid);

receivedCount.incrementAndGet();
} else {
throw new IllegalStateException("Wrong message receive: " + receivedEvent.getMessage());
Expand All @@ -133,9 +147,18 @@ void testThatJsonStringMessageCanBeSentAndRetrieved(boolean deferUntilCommit) {
await().until(() -> tkmsStorageToKafkaProxy.isPaused());

transactionsHelper.withTransaction().run(() -> {
var result = transactionalKafkaMessageSender
.sendMessage(new TkmsMessage().setTopic(testProperties.getTestTopic()).setValue(toJsonBytes(testEvent))
.addHeader(new Header().setKey("x-tw-criticality").setValue("PrettyLowLol".getBytes(StandardCharsets.UTF_8))));
var result = transactionalKafkaMessageSender.sendMessage(
new TkmsMessage()
.setTopic(testProperties.getTestTopic())
.setValue(toJsonBytes(testEvent))
.addHeader(
new Header()
.setKey("x-tw-criticality")
.setValue("PrettyLowLol".getBytes(StandardCharsets.UTF_8))
)
.addPriorityHeader(priority)
.addUuidHeader(uuid)
);

var messagesCount = tkmsTestDao.getMessagesCount(result.getShardPartition());
if (deferUntilCommit) {
Expand Down
Loading