();
@@ -263,6 +268,7 @@ public SendMessageResult sendMessage(SendMessageRequest request) {
checkActiveTransaction(shardPartition.getShard(), transactionActive, deferMessageRegistrationUntilCommit);
+ addStandardHeaders(request.getTkmsMessage());
validateMessage(message, 0);
validateMessageSize(message, 0);
@@ -374,13 +380,6 @@ public void afterCompletion(int status) {
}
}
- protected void validateMessages(SendMessagesRequest request) {
- for (int i = 0; i < request.getTkmsMessages().size(); i++) {
- var tkmsMessage = request.getTkmsMessages().get(i);
- validateMessage(tkmsMessage, i);
- }
- }
-
protected void validateMessage(TkmsMessage message, int messageIdx) {
Preconditions.checkNotNull(message, "%s: No message provided.", messageIdx);
Preconditions.checkArgument(!Strings.isNullOrEmpty(message.getTopic()), "%s: No topic provided.", messageIdx);
@@ -396,13 +395,23 @@ protected void validateMessage(TkmsMessage message, int messageIdx) {
message.getShard(), properties.getShardsCount());
}
Preconditions.checkNotNull(message.getValue(), "%s: Value can not be null.", messageIdx);
+ boolean uuidHeaderPresent = false;
if (message.getHeaders() != null) {
for (int headerIdx = 0; headerIdx < message.getHeaders().size(); headerIdx++) {
Header header = message.getHeaders().get(headerIdx);
Preconditions.checkNotNull(header.getValue(), "%s: Header value @{%s} can not be null.", messageIdx, headerIdx);
Preconditions.checkArgument(!Strings.isNullOrEmpty(header.getKey()), "%s: Header key @{%s} can not be null.", messageIdx, headerIdx);
+ uuidHeaderPresent |= StandardHeaders.X_WISE_UUID.equals(header.getKey());
}
}
+ if (properties.isUuidHeaderRequired() && !uuidHeaderPresent) {
+ throw new IllegalArgumentException(
+ "%d: Message is required to have @{%s} header.".formatted(
+ messageIdx,
+ StandardHeaders.X_WISE_UUID
+ )
+ );
+ }
}
/**
@@ -434,6 +443,24 @@ protected void validateMessageSize(TkmsMessage message, int messageIdx) {
}
}
+ private static void addStandardHeaders(TkmsMessage tkmsMessage) {
+ if (tkmsMessage.getPriority() != null) {
+ tkmsMessage.addHeader(
+ new Header()
+ .setKey(StandardHeaders.X_WISE_PRIORITY)
+ .setValue(tkmsMessage.getPriority().toString().getBytes(StandardCharsets.UTF_8))
+ );
+ }
+ // uuid shall remain last header, so it can be quickly accessed using Headers#lastHeader
+ if (tkmsMessage.getUuid() != null) {
+ tkmsMessage.addHeader(
+ new Header()
+ .setKey(StandardHeaders.X_WISE_UUID)
+ .setValue(tkmsMessage.getUuid().toString().getBytes(StandardCharsets.UTF_8))
+ );
+ }
+ }
+
private int utf8Length(CharSequence s) {
if (s == null) {
return 0;
diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsMessage.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsMessage.java
index cd394b5..98e2dc9 100644
--- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsMessage.java
+++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsMessage.java
@@ -1,11 +1,13 @@
package com.transferwise.kafka.tkms.api;
+import com.transferwise.common.baseutils.UuidUtils;
import com.transferwise.kafka.tkms.CompressionAlgorithm;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.PositiveOrZero;
@@ -71,6 +73,31 @@ public class TkmsMessage {
*/
private Map, ?> metadata;
+ /**
+ * Uniquely identifies this message for consumers.
+ *
+ * If value is set then it will be added to headers with {@code x-wise-uuid} key.
+ *
+ *
Having UUID in header allows consumer to run deduplication check on this value without need to deserialize payload.
+ * If payload provides uuid it must be the same as this value so that consumers that depend on either of these values can have consistent
+ * deduplication.
+ *
+ *
Prefer using sequential uuids (e.g. {@link UuidUtils#generatePrefixCombUuid()}) which are proved to yield better performance.
+ */
+ private UUID uuid;
+
+ /**
+ * Defines priority of this message for consumers.
+ *
+ *
Lower value means higher priority. For example, 0 is higher priority than 10.
+ *
+ *
If value is set then it will be added to headers with {@code x-wise-priority} key.
+ *
+ *
Having priority in header allows consumer to derive priority without need to deserialize payload. For example, it can be useful
+ * when consumers filter messages based on priority before deciding how to process those.
+ */
+ private Long priority;
+
public TkmsMessage addHeader(Header header) {
if (headers == null) {
headers = new ArrayList<>();
diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java
index a2c7468..4a1b523 100644
--- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java
+++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java
@@ -3,6 +3,7 @@
import com.transferwise.common.baseutils.validation.LegacyResolvedValue;
import com.transferwise.common.baseutils.validation.ResolvedValue;
import com.transferwise.kafka.tkms.CompressionAlgorithm;
+import com.transferwise.kafka.tkms.api.TkmsMessage;
import com.transferwise.kafka.tkms.api.TkmsShardPartition;
import java.time.Duration;
import java.util.ArrayList;
@@ -238,6 +239,11 @@ public void afterPropertiesSet() {
*/
private boolean deferMessageRegistrationUntilCommit = false;
+ /**
+ * Whether every message sent is required to have {@code x-wise-uuid} header. See {@link TkmsMessage#getUuid()} for more details.
+ */
+ private boolean uuidHeaderRequired = true;
+
@Valid
@jakarta.validation.Valid
private Compression compression = new Compression();
diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/metrics/TkmsMetricsTemplate.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/metrics/TkmsMetricsTemplate.java
index 9963190..6cae9fb 100644
--- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/metrics/TkmsMetricsTemplate.java
+++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/metrics/TkmsMetricsTemplate.java
@@ -8,15 +8,9 @@
import com.transferwise.kafka.tkms.config.TkmsProperties;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Meter;
-import io.micrometer.core.instrument.Meter.Type;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
-import io.micrometer.core.instrument.config.MeterFilter;
-import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import java.time.Instant;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import lombok.Data;
@@ -37,6 +31,7 @@ public class TkmsMetricsTemplate implements ITkmsMetricsTemplate, InitializingBe
// Miccrometer 1.13 (which comes with Spring boot 3.3) doesn't properly convert gauge metrics with info suffix when using underscore,
// using dot here as a workaround
public static final String GAUGE_LIBRARY_INFO = "tw.library.info";
+ public static final String GAUGE_CONFIGURATION_UUID_HEADER_REQUIRED = "tw_tkms_configuration_uuid_header_required";
public static final String TIMER_PROXY_POLL = "tw_tkms_proxy_poll";
public static final String GAUGE_PROXY_POLL_IN_PROGRESS = "tw_tkms_proxy_poll_in_progress";
public static final String TIMER_PROXY_CYCLE = "tw_tkms_proxy_cycle";
@@ -288,6 +283,9 @@ public void registerLibrary() {
Gauge.builder(GAUGE_LIBRARY_INFO, () -> 1d).tags("version", version, "library", "tw-tkms")
.description("Provides metadata about the library, for example the version.")
.register(meterCache.getMeterRegistry());
+ Gauge.builder(GAUGE_CONFIGURATION_UUID_HEADER_REQUIRED, tkmsProperties, props -> props.isUuidHeaderRequired() ? 1d : 0d)
+ .description("0 - uuid header isn't required, 1 - uuid header is required")
+ .register(meterCache.getMeterRegistry());
}
@Override
diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EarliestMessageTrackingIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EarliestMessageTrackingIntTest.java
index 3a6796b..95b7397 100644
--- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EarliestMessageTrackingIntTest.java
+++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EarliestMessageTrackingIntTest.java
@@ -3,6 +3,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
+import com.transferwise.common.baseutils.UuidUtils;
import com.transferwise.common.baseutils.clock.TestClock;
import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper;
import com.transferwise.kafka.tkms.api.TkmsMessage;
@@ -106,7 +107,12 @@ void testIfEarliestMessageTrackerBehavesAsExpected() {
protected void sendMessageAndWaitForArrival() {
transactionsHelper.withTransaction().run(() -> {
- var result = tkms.sendMessage(new TkmsMessage().setTopic(testTopic).setValue("Hello Kristo!".getBytes(StandardCharsets.UTF_8)));
+ var result = tkms.sendMessage(
+ new TkmsMessage()
+ .setUuid(UuidUtils.generatePrefixCombUuid())
+ .setTopic(testTopic)
+ .setValue("Hello Kristo!".getBytes(StandardCharsets.UTF_8))
+ );
log.info("Registered a message with storage id " + result.getStorageId());
}
);
diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java
index 5747db8..9b41f21 100644
--- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java
+++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java
@@ -1,5 +1,6 @@
package com.transferwise.kafka.tkms;
+import static com.transferwise.common.baseutils.UuidUtils.generatePrefixCombUuid;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;
@@ -28,6 +29,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;
@@ -107,6 +109,8 @@ void testThatJsonStringMessageCanBeSentAndRetrieved(boolean deferUntilCommit) {
for (int i = 0; i < messageMultiplier; i++) {
sb.append(messagePart);
}
+ var uuid = generatePrefixCombUuid();
+ var priority = 17L;
tkmsStorageToKafkaProxy.pause();
@@ -116,10 +120,20 @@ void testThatJsonStringMessageCanBeSentAndRetrieved(boolean deferUntilCommit) {
Consumer> messageCounter = cr -> ExceptionUtils.doUnchecked(() -> {
TestEvent receivedEvent = objectMapper.readValue(cr.value(), TestEvent.class);
if (receivedEvent.getMessage().equals(message)) {
- assertThat(cr.headers().toArray()).hasSize(1);
- org.apache.kafka.common.header.Header header = cr.headers().toArray()[0];
- assertThat(header.key()).isEqualTo("x-tw-criticality");
- assertThat(new String(header.value(), StandardCharsets.UTF_8)).isEqualTo("PrettyLowLol");
+ assertThat(cr.headers().toArray()).hasSize(3);
+
+ org.apache.kafka.common.header.Header criticalityHeader = cr.headers().toArray()[0];
+ assertThat(criticalityHeader.key()).isEqualTo("x-tw-criticality");
+ assertThat(new String(criticalityHeader.value(), StandardCharsets.UTF_8)).isEqualTo("PrettyLowLol");
+
+ org.apache.kafka.common.header.Header priorityHeader = cr.headers().toArray()[1];
+ assertThat(priorityHeader.key()).isEqualTo(StandardHeaders.X_WISE_PRIORITY);
+ assertThat(Long.parseLong(new String(priorityHeader.value(), StandardCharsets.UTF_8))).isEqualTo(priority);
+
+ org.apache.kafka.common.header.Header uuidHeader = cr.headers().toArray()[2];
+ assertThat(uuidHeader.key()).isEqualTo(StandardHeaders.X_WISE_UUID);
+ assertThat(UUID.fromString(new String(uuidHeader.value(), StandardCharsets.UTF_8))).isEqualTo(uuid);
+
receivedCount.incrementAndGet();
} else {
throw new IllegalStateException("Wrong message receive: " + receivedEvent.getMessage());
@@ -133,9 +147,18 @@ void testThatJsonStringMessageCanBeSentAndRetrieved(boolean deferUntilCommit) {
await().until(() -> tkmsStorageToKafkaProxy.isPaused());
transactionsHelper.withTransaction().run(() -> {
- var result = transactionalKafkaMessageSender
- .sendMessage(new TkmsMessage().setTopic(testProperties.getTestTopic()).setValue(toJsonBytes(testEvent))
- .addHeader(new Header().setKey("x-tw-criticality").setValue("PrettyLowLol".getBytes(StandardCharsets.UTF_8))));
+ var result = transactionalKafkaMessageSender.sendMessage(
+ new TkmsMessage()
+ .setUuid(uuid)
+ .setPriority(priority)
+ .setTopic(testProperties.getTestTopic())
+ .setValue(toJsonBytes(testEvent))
+ .addHeader(
+ new Header()
+ .setKey("x-tw-criticality")
+ .setValue("PrettyLowLol".getBytes(StandardCharsets.UTF_8))
+ )
+ );
var messagesCount = tkmsTestDao.getMessagesCount(result.getShardPartition());
if (deferUntilCommit) {
@@ -206,8 +229,12 @@ void testExactlyOnceDelivery(int scenario) throws Exception {
for (long i = 0; i < batchSize; i++) {
long id = finalT * threadsCount * batchesCount + finalB * batchesCount + i;
TestEvent testEvent = new TestEvent().setId(id).setMessage(message);
- transactionalKafkaMessageSender
- .sendMessage(new TkmsMessage().setTopic(testProperties.getTestTopic()).setValue(objectMapper.writeValueAsBytes(testEvent)));
+ transactionalKafkaMessageSender.sendMessage(
+ new TkmsMessage()
+ .setUuid(generatePrefixCombUuid())
+ .setTopic(testProperties.getTestTopic())
+ .setValue(objectMapper.writeValueAsBytes(testEvent))
+ );
}
return null;
});
@@ -275,8 +302,14 @@ void testThatMessagesWithSameKeyEndUpInOnePartition(boolean deferUntilCommit) {
TestEvent testEvent = new TestEvent().setId(1L).setMessage(message);
transactionsHelper.withTransaction().run(() ->
- transactionalKafkaMessageSender
- .sendMessage(new TkmsMessage().setKey(key).setTopic(testProperties.getTestTopic()).setValue(toJsonBytes(testEvent))));
+ transactionalKafkaMessageSender.sendMessage(
+ new TkmsMessage()
+ .setUuid(generatePrefixCombUuid())
+ .setKey(key)
+ .setTopic(testProperties.getTestTopic())
+ .setValue(toJsonBytes(testEvent))
+ )
+ );
}
await().until(() -> receivedCount.get() >= n);
@@ -312,10 +345,14 @@ void testThatMessagesWithSamePartitionEndUpInOnePartition(boolean deferUntilComm
TestEvent testEvent = new TestEvent().setId(1L).setMessage(message);
transactionsHelper.withTransaction().run(() ->
- transactionalKafkaMessageSender
- .sendMessage(
- new TkmsMessage().setPartition(partition).setTopic(testProperties.getTestTopic())
- .setValue(toJsonBytes(testEvent))));
+ transactionalKafkaMessageSender.sendMessage(
+ new TkmsMessage()
+ .setUuid(generatePrefixCombUuid())
+ .setPartition(partition)
+ .setTopic(testProperties.getTestTopic())
+ .setValue(toJsonBytes(testEvent))
+ )
+ );
}
await().until(() -> receivedCount.get() >= n);
@@ -359,9 +396,14 @@ void testThatMessagesOrderForAnEntityIsPreserved(boolean deferUntilCommit) throw
long id = finalEntityId * entityEventsCount + i;
TestEvent testEvent = new TestEvent().setId(id).setEntityId(finalEntityId).setMessage(message);
transactionsHelper.withTransaction().run(() ->
- transactionalKafkaMessageSender
- .sendMessage(new TkmsMessage().setKey(String.valueOf(finalEntityId)).setTopic(testProperties.getTestTopic())
- .setValue(toJsonBytes(testEvent))));
+ transactionalKafkaMessageSender.sendMessage(
+ new TkmsMessage()
+ .setUuid(generatePrefixCombUuid())
+ .setKey(String.valueOf(finalEntityId))
+ .setTopic(testProperties.getTestTopic())
+ .setValue(toJsonBytes(testEvent))
+ )
+ );
checkIfTransactionContextsHaveBeenCleared();
}
@@ -429,8 +471,13 @@ void testThatMessagesOrderForAnEntityIsPreservedWithBatches(boolean deferUntilCo
for (int j = 0; j < batchSize; j++) {
var id = finalEntityId * entityEventsCount + i;
var testEvent = new TestEvent().setId(id).setEntityId(finalEntityId).setMessage(message);
- sendMessagesRequest.addTkmsMessage(new TkmsMessage().setKey(String.valueOf(finalEntityId)).setTopic(testProperties.getTestTopic())
- .setValue(toJsonBytes(testEvent)));
+ sendMessagesRequest.addTkmsMessage(
+ new TkmsMessage()
+ .setUuid(generatePrefixCombUuid())
+ .setKey(String.valueOf(finalEntityId))
+ .setTopic(testProperties.getTestTopic())
+ .setValue(toJsonBytes(testEvent))
+ );
i++;
}
@@ -489,9 +536,16 @@ void sendingToUnknownTopicWillBePreventedWhenTopicAutoCreationIsDisabled(boolean
var expectedMessage =
useAdminClient ? "Topic 'NotExistingTopic' does not exist." : "Topic NotExistingTopic not present in metadata after";
- assertThatThrownBy(() -> transactionsHelper.withTransaction().run(() -> transactionalKafkaMessageSender
- .sendMessage(new TkmsMessage().setTopic("NotExistingTopic").setValue("Stuff".getBytes(StandardCharsets.UTF_8)))))
- .hasMessageContaining(expectedMessage);
+ assertThatThrownBy(
+ () -> transactionsHelper.withTransaction().run(
+ () -> transactionalKafkaMessageSender.sendMessage(
+ new TkmsMessage()
+ .setUuid(generatePrefixCombUuid())
+ .setTopic("NotExistingTopic")
+ .setValue("Stuff".getBytes(StandardCharsets.UTF_8))
+ )
+ )
+ ).hasMessageContaining(expectedMessage);
} finally {
// Stop logs spam about not existing topic in metadata.
tkmsKafkaProducerProvider.closeKafkaProducersForTopicValidation();
@@ -503,9 +557,14 @@ void sendingToUnknownTopicWillBePreventedWhenTopicAutoCreationIsDisabled(boolean
void sendingOutMessagesWithoutActiveTransactionsWillFail(boolean deferUntilCommit) {
setupConfig(deferUntilCommit);
- assertThatThrownBy(() -> transactionalKafkaMessageSender
- .sendMessage(new TkmsMessage().setTopic("NotExistingTopic").setValue("Stuff".getBytes(StandardCharsets.UTF_8))))
- .isInstanceOf(IllegalStateException.class)
+ assertThatThrownBy(
+ () -> transactionalKafkaMessageSender.sendMessage(
+ new TkmsMessage()
+ .setUuid(generatePrefixCombUuid())
+ .setTopic("NotExistingTopic")
+ .setValue("Stuff".getBytes(StandardCharsets.UTF_8))
+ )
+ ).isInstanceOf(IllegalStateException.class)
.hasMessageContaining("No active transaction detected.");
}
@@ -526,10 +585,10 @@ void sendingMultipleMessagesWorks(boolean deferUntilCommit) {
SendMessagesResult sendMessagesResult =
transactionsHelper.withTransaction().call(() -> transactionalKafkaMessageSender.sendMessages(new SendMessagesRequest()
- .addTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value))
- .addTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value).setShard(1))
- .addTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value).setShard(0).setPartition(0))
- .addTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value).setShard(0).setPartition(1))
+ .addTkmsMessage(new TkmsMessage().setUuid(generatePrefixCombUuid()).setTopic(topic).setValue(value))
+ .addTkmsMessage(new TkmsMessage().setUuid(generatePrefixCombUuid()).setTopic(topic).setValue(value).setShard(1))
+ .addTkmsMessage(new TkmsMessage().setUuid(generatePrefixCombUuid()).setTopic(topic).setValue(value).setShard(0).setPartition(0))
+ .addTkmsMessage(new TkmsMessage().setUuid(generatePrefixCombUuid()).setTopic(topic).setValue(value).setShard(0).setPartition(1))
));
assertThat(sendMessagesResult.getResults().size()).isEqualTo(4);
@@ -577,21 +636,21 @@ void sendingMultipleMessagesWithMultipleStatementsWorks(boolean deferUntilCommit
transactionsHelper.withTransaction().run(() -> {
transactionalKafkaMessageSender.sendMessages(new SendMessagesRequest()
- .addTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value))
- .addTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value).setShard(1))
- .addTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value).setShard(0).setPartition(0))
- .addTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value).setShard(0).setPartition(1))
+ .addTkmsMessage(new TkmsMessage().setUuid(generatePrefixCombUuid()).setTopic(topic).setValue(value))
+ .addTkmsMessage(new TkmsMessage().setUuid(generatePrefixCombUuid()).setTopic(topic).setValue(value).setShard(0).setPartition(0))
+ .addTkmsMessage(new TkmsMessage().setUuid(generatePrefixCombUuid()).setTopic(topic).setValue(value).setShard(0).setPartition(1))
+ .addTkmsMessage(new TkmsMessage().setUuid(generatePrefixCombUuid()).setTopic(topic).setValue(value).setShard(1))
);
transactionalKafkaMessageSender.sendMessage(new SendMessageRequest().setDeferMessageRegistrationUntilCommit(deferUntilCommit)
- .setTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value)));
+ .setTkmsMessage(new TkmsMessage().setUuid(generatePrefixCombUuid()).setTopic(topic).setValue(value)));
transactionalKafkaMessageSender.sendMessage(new SendMessageRequest()
- .setTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value)));
+ .setTkmsMessage(new TkmsMessage().setUuid(generatePrefixCombUuid()).setTopic(topic).setValue(value)));
transactionalKafkaMessageSender.sendMessages(new SendMessagesRequest()
- .addTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value))
- .addTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value))
+ .addTkmsMessage(new TkmsMessage().setUuid(generatePrefixCombUuid()).setTopic(topic).setValue(value))
+ .addTkmsMessage(new TkmsMessage().setUuid(generatePrefixCombUuid()).setTopic(topic).setValue(value))
);
var messagesCount = tkmsTestDao.getMessagesCount(TkmsShardPartition.of(0, 0))
@@ -637,46 +696,98 @@ void mixingDeferredAndNotDeferredMessagesIsPrevented(boolean deferUntilCommit) {
assertThatThrownBy(() -> {
transactionsHelper.withTransaction().run(() -> {
- transactionalKafkaMessageSender.sendMessages(new SendMessagesRequest()
- .addTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value))
+ transactionalKafkaMessageSender.sendMessages(
+ new SendMessagesRequest().addTkmsMessage(
+ new TkmsMessage()
+ .setUuid(generatePrefixCombUuid())
+ .setTopic(topic)
+ .setValue(value)
+ )
);
- transactionalKafkaMessageSender.sendMessages(new SendMessagesRequest().setDeferMessageRegistrationUntilCommit(!deferUntilCommit)
- .addTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value))
+ transactionalKafkaMessageSender.sendMessages(
+ new SendMessagesRequest()
+ .setDeferMessageRegistrationUntilCommit(!deferUntilCommit)
+ .addTkmsMessage(
+ new TkmsMessage()
+ .setUuid(generatePrefixCombUuid())
+ .setTopic(topic)
+ .setValue(value)
+ )
);
});
}).hasMessage("You can not mix deferred and not deferred messages in the same transaction, as it will break the ordering guarantees.");
assertThatThrownBy(() -> {
transactionsHelper.withTransaction().run(() -> {
- transactionalKafkaMessageSender.sendMessage(new SendMessageRequest()
- .setTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value))
+ transactionalKafkaMessageSender.sendMessage(
+ new SendMessageRequest().setTkmsMessage(
+ new TkmsMessage()
+ .setUuid(generatePrefixCombUuid())
+ .setTopic(topic)
+ .setValue(value)
+ )
);
- transactionalKafkaMessageSender.sendMessage(new SendMessageRequest().setDeferMessageRegistrationUntilCommit(!deferUntilCommit)
- .setTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value))
+ transactionalKafkaMessageSender.sendMessage(
+ new SendMessageRequest()
+ .setDeferMessageRegistrationUntilCommit(!deferUntilCommit)
+ .setTkmsMessage(
+ new TkmsMessage()
+ .setUuid(generatePrefixCombUuid())
+ .setTopic(topic)
+ .setValue(value)
+ )
);
});
}).hasMessage("You can not mix deferred and not deferred messages in the same transaction, as it will break the ordering guarantees.");
// We can mix it between shard-partitions, because between those the order is not important.
transactionsHelper.withTransaction().run(() -> {
- transactionalKafkaMessageSender.sendMessages(new SendMessagesRequest()
- .addTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value).setShard(0))
+ transactionalKafkaMessageSender.sendMessages(
+ new SendMessagesRequest().addTkmsMessage(
+ new TkmsMessage()
+ .setUuid(generatePrefixCombUuid())
+ .setTopic(topic)
+ .setValue(value)
+ .setShard(0)
+ )
);
- transactionalKafkaMessageSender.sendMessages(new SendMessagesRequest().setDeferMessageRegistrationUntilCommit(!deferUntilCommit)
- .addTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value).setShard(1))
+ transactionalKafkaMessageSender.sendMessages(
+ new SendMessagesRequest()
+ .setDeferMessageRegistrationUntilCommit(!deferUntilCommit)
+ .addTkmsMessage(
+ new TkmsMessage()
+ .setUuid(generatePrefixCombUuid())
+ .setTopic(topic)
+ .setValue(value)
+ .setShard(1)
+ )
);
});
transactionsHelper.withTransaction().run(() -> {
- transactionalKafkaMessageSender.sendMessage(new SendMessageRequest()
- .setTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value).setShard(0))
+ transactionalKafkaMessageSender.sendMessage(
+ new SendMessageRequest().setTkmsMessage(
+ new TkmsMessage()
+ .setUuid(generatePrefixCombUuid())
+ .setTopic(topic)
+ .setValue(value)
+ .setShard(0)
+ )
);
- transactionalKafkaMessageSender.sendMessage(new SendMessageRequest().setDeferMessageRegistrationUntilCommit(!deferUntilCommit)
- .setTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value).setShard(1))
+ transactionalKafkaMessageSender.sendMessage(
+ new SendMessageRequest()
+ .setDeferMessageRegistrationUntilCommit(!deferUntilCommit)
+ .setTkmsMessage(
+ new TkmsMessage()
+ .setUuid(generatePrefixCombUuid())
+ .setTopic(topic)
+ .setValue(value)
+ .setShard(1)
+ )
);
});
@@ -712,23 +823,39 @@ void testThatSendingLargeMessagesWillNotCauseAnIssue(boolean deferUntilCommit) {
transactionsHelper.withTransaction().call(() ->
transactionalKafkaMessageSender
.sendMessage(
- new TkmsMessage().setTopic(testProperties.getTestTopic()).setValue(message.getValue().getBytes(StandardCharsets.US_ASCII)))))
+ new TkmsMessage()
+ .setUuid(generatePrefixCombUuid())
+ .setTopic(testProperties.getTestTopic())
+ .setValue(message.getValue().getBytes(StandardCharsets.US_ASCII))
+ )
+ ))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("0: Estimated message size is 10485878, which is larger than maximum of 10485760.");
+ .hasMessage("0: Estimated message size is 10485937, which is larger than maximum of 10485760.");
assertThatThrownBy(() ->
transactionsHelper.withTransaction().call(() ->
- transactionalKafkaMessageSender
- .sendMessages(new SendMessagesRequest().addTkmsMessage(
- new TkmsMessage().setTopic(testProperties.getTestTopic()).setValue(message.getValue().getBytes(StandardCharsets.US_ASCII))))))
+ transactionalKafkaMessageSender.sendMessages(
+ new SendMessagesRequest().addTkmsMessage(
+ new TkmsMessage()
+ .setUuid(generatePrefixCombUuid())
+ .setTopic(testProperties.getTestTopic())
+ .setValue(message.getValue().getBytes(StandardCharsets.US_ASCII))
+ )
+ )
+ )
+ )
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("0: Estimated message size is 10485878, which is larger than maximum of 10485760.");
+ .hasMessage("0: Estimated message size is 10485937, which is larger than maximum of 10485760.");
message.setValue(message.getValue().substring(0, 10484000));
transactionsHelper.withTransaction().run(() ->
- transactionalKafkaMessageSender
- .sendMessage(
- new TkmsMessage().setTopic(testProperties.getTestTopic()).setValue(message.getValue().getBytes(StandardCharsets.US_ASCII))));
+ transactionalKafkaMessageSender.sendMessage(
+ new TkmsMessage()
+ .setUuid(generatePrefixCombUuid())
+ .setTopic(testProperties.getTestTopic())
+ .setValue(message.getValue().getBytes(StandardCharsets.US_ASCII))
+ )
+ );
await().until(() -> receivedCount.get() > 0);
@@ -768,8 +895,13 @@ void testThatTemporaryDeleteFailureDoesNotLeaveTrashBehind(boolean deferUntilCom
try {
for (int i = 0; i < messagesCount; i++) {
TestEvent testEvent = new TestEvent().setId((long) i).setMessage(message);
- transactionsHelper.withTransaction().run(() -> transactionalKafkaMessageSender
- .sendMessage(new TkmsMessage().setTopic(testProperties.getTestTopic()).setValue(toJsonBytes(testEvent))));
+ transactionsHelper.withTransaction().run(
+ () -> transactionalKafkaMessageSender.sendMessage(
+ new TkmsMessage()
+ .setUuid(generatePrefixCombUuid())
+ .setTopic(testProperties.getTestTopic())
+ .setValue(toJsonBytes(testEvent)))
+ );
}
await().until(() -> receivedCount.get() >= messagesCount);
@@ -827,10 +959,20 @@ void testThatInsertFailureDoesNotLeaveTrashBehind(boolean deferUntilCommit) {
var testEvent1 = new TestEvent().setId(1L).setMessage(message);
assertThatThrownBy(() -> transactionsHelper.withTransaction().run(() -> {
- transactionalKafkaMessageSender
- .sendMessage(new TkmsMessage().setTopic(testProperties.getTestTopic()).setValue(toJsonBytes(testEvent0)).setShard(0));
- transactionalKafkaMessageSender
- .sendMessage(new TkmsMessage().setTopic(testProperties.getTestTopic()).setValue(toJsonBytes(testEvent1)).setShard(1));
+ transactionalKafkaMessageSender.sendMessage(
+ new TkmsMessage()
+ .setUuid(generatePrefixCombUuid())
+ .setTopic(testProperties.getTestTopic())
+ .setValue(toJsonBytes(testEvent0))
+ .setShard(0)
+ );
+ transactionalKafkaMessageSender.sendMessage(
+ new TkmsMessage()
+ .setUuid(generatePrefixCombUuid())
+ .setTopic(testProperties.getTestTopic())
+ .setValue(toJsonBytes(testEvent1))
+ .setShard(1)
+ );
})).hasMessage("Haha, inserts are failing lol.");
assertThat(receivedCount.get()).isEqualTo(0);
@@ -854,12 +996,12 @@ private static Stream compressionInput() {
var arguments = new ArrayList();
for (var deferUntilCommit : deferUntilCommits) {
- arguments.add(Arguments.of(CompressionAlgorithm.GZIP, 102, 103, deferUntilCommit));
- arguments.add(Arguments.of(CompressionAlgorithm.NONE, 1163, 1163, deferUntilCommit));
- arguments.add(Arguments.of(CompressionAlgorithm.LZ4, 126, 126, deferUntilCommit));
- arguments.add(Arguments.of(CompressionAlgorithm.SNAPPY, 158, 158, deferUntilCommit));
- arguments.add(Arguments.of(CompressionAlgorithm.SNAPPY_FRAMED, 156, 156, deferUntilCommit));
- arguments.add(Arguments.of(CompressionAlgorithm.ZSTD, 92, 92, deferUntilCommit));
+ arguments.add(Arguments.of(CompressionAlgorithm.GZIP, 156, 157, deferUntilCommit));
+ arguments.add(Arguments.of(CompressionAlgorithm.NONE, 1218, 1218, deferUntilCommit));
+ arguments.add(Arguments.of(CompressionAlgorithm.LZ4, 182, 182, deferUntilCommit));
+ arguments.add(Arguments.of(CompressionAlgorithm.SNAPPY, 214, 214, deferUntilCommit));
+ arguments.add(Arguments.of(CompressionAlgorithm.SNAPPY_FRAMED, 212, 212, deferUntilCommit));
+ arguments.add(Arguments.of(CompressionAlgorithm.ZSTD, 147, 147, deferUntilCommit));
}
return arguments.stream();
@@ -897,9 +1039,14 @@ void testMessageIsCompressed(CompressionAlgorithm algorithm, int expectedSeriali
TestEvent testEvent = new TestEvent().setId(1L).setMessage(message);
transactionsHelper.withTransaction().run(() ->
- transactionalKafkaMessageSender
- .sendMessage(new TkmsMessage().setTopic(testProperties.getTestTopic()).setValue(toJsonBytes(testEvent))
- .setCompression(new Compression().setAlgorithm(algorithm))));
+ transactionalKafkaMessageSender.sendMessage(
+ new TkmsMessage()
+ .setUuid(UUID.fromString("7554ffe0-4da2-4de9-bebf-f3131fd7a84a"))
+ .setTopic(testProperties.getTestTopic())
+ .setValue(toJsonBytes(testEvent))
+ .setCompression(new Compression().setAlgorithm(algorithm))
+ )
+ );
await().until(() -> receivedCount.get() > 0);
waitUntilTablesAreEmpty();
diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/ManualIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/ManualIntTest.java
index 59d04e1..e49e426 100644
--- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/ManualIntTest.java
+++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/ManualIntTest.java
@@ -2,6 +2,7 @@
import static org.awaitility.Awaitility.await;
+import com.transferwise.common.baseutils.UuidUtils;
import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper;
import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender;
import com.transferwise.kafka.tkms.api.TkmsMessage;
@@ -74,7 +75,13 @@ void fillTkmsTable() {
try {
transactionsHelper.withTransaction().run(() -> {
for (int i = 0; i < batchSize; i++) {
- transactionalKafkaMessageSender.sendMessage(new TkmsMessage().setTopic("TestTopicPostgres").setKey("a").setValue(contentBytes));
+ transactionalKafkaMessageSender.sendMessage(
+ new TkmsMessage()
+ .setUuid(UuidUtils.generatePrefixCombUuid())
+ .setTopic("TestTopicPostgres")
+ .setKey("a")
+ .setValue(contentBytes)
+ );
}
});
log.info("Inserted batch #" + finalT);
diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessageDecorationTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessageDecorationTest.java
index 8a24b17..3d70ac2 100644
--- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessageDecorationTest.java
+++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessageDecorationTest.java
@@ -4,6 +4,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import com.transferwise.common.baseutils.UuidUtils;
import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper;
import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender.SendMessagesRequest;
import com.transferwise.kafka.tkms.api.TkmsMessage;
@@ -42,10 +43,26 @@ void messagesAreDecorateWithJambi() {
String topic = testProperties.getTestTopic();
transactionsHelper.withTransaction().run(() ->
- transactionalKafkaMessageSender.sendMessages(new SendMessagesRequest()
- .addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("adam-jones").setShard(4).setValue(someValue))
- .addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("danny-carey").setPartition(5).setValue(someValue))
- ));
+ transactionalKafkaMessageSender.sendMessages(
+ new SendMessagesRequest()
+ .addTkmsMessage(
+ new TkmsMessage()
+ .setUuid(UuidUtils.generatePrefixCombUuid())
+ .setTopic(topic)
+ .setKey("adam-jones")
+ .setShard(4)
+ .setValue(someValue)
+ )
+ .addTkmsMessage(
+ new TkmsMessage()
+ .setUuid(UuidUtils.generatePrefixCombUuid())
+ .setTopic(topic)
+ .setKey("danny-carey")
+ .setPartition(5)
+ .setValue(someValue)
+ )
+ )
+ );
await().until(() -> tkmsSentMessagesCollector.getSentMessages(topic).size() == 2);
var messages = tkmsSentMessagesCollector.getSentMessages(topic);
diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagesInterceptionTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagesInterceptionTest.java
index 29b2214..352597b 100644
--- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagesInterceptionTest.java
+++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagesInterceptionTest.java
@@ -3,6 +3,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
+import com.transferwise.common.baseutils.UuidUtils;
import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper;
import com.transferwise.kafka.tkms.api.ITkmsMessageInterceptor.MessageInterceptionDecision;
import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender.SendMessagesRequest;
@@ -67,11 +68,31 @@ void messagesCanBeIntercepted() {
String topic = testProperties.getTestTopic();
transactionsHelper.withTransaction().run(() ->
- transactionalKafkaMessageSender.sendMessages(new SendMessagesRequest()
- .addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("A").setValue(someValue))
- .addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("B").setValue(someValue))
- .addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("C").setValue(someValue))
- ));
+ transactionalKafkaMessageSender.sendMessages(
+ new SendMessagesRequest()
+ .addTkmsMessage(
+ new TkmsMessage()
+ .setUuid(UuidUtils.generatePrefixCombUuid())
+ .setTopic(topic)
+ .setKey("A")
+ .setValue(someValue)
+ )
+ .addTkmsMessage(
+ new TkmsMessage()
+ .setUuid(UuidUtils.generatePrefixCombUuid())
+ .setTopic(topic)
+ .setKey("B")
+ .setValue(someValue)
+ )
+ .addTkmsMessage(
+ new TkmsMessage()
+ .setUuid(UuidUtils.generatePrefixCombUuid())
+ .setTopic(topic)
+ .setKey("C")
+ .setValue(someValue)
+ )
+ )
+ );
await().until(() -> tkmsSentMessagesCollector.getSentMessages(topic).size() == 2);
diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MultiServerTopicValidationIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MultiServerTopicValidationIntTest.java
index 0773705..112c77d 100644
--- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MultiServerTopicValidationIntTest.java
+++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MultiServerTopicValidationIntTest.java
@@ -2,6 +2,7 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import com.transferwise.common.baseutils.UuidUtils;
import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper;
import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender;
import com.transferwise.kafka.tkms.api.TkmsMessage;
@@ -62,7 +63,10 @@ void sendingToUnknownTopicWillBePreventedWhenTopicAutoCreationIsDisabled(boolean
protected void sendMessage(String topic, Integer shard) {
try {
transactionsHelper.withTransaction().run(() -> {
- var message = new TkmsMessage().setTopic(topic).setValue("Stuff".getBytes(StandardCharsets.UTF_8));
+ var message = new TkmsMessage()
+ .setUuid(UuidUtils.generatePrefixCombUuid())
+ .setTopic(topic)
+ .setValue("Stuff".getBytes(StandardCharsets.UTF_8));
if (shard != null) {
message.setShard(shard);
}
diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/PostgresEndToEndIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/PostgresEndToEndIntTest.java
index c1e9a71..c6721b3 100644
--- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/PostgresEndToEndIntTest.java
+++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/PostgresEndToEndIntTest.java
@@ -16,12 +16,12 @@ private static Stream compressionInput() {
var arguments = new ArrayList();
for (var deferUntilCommit : deferUntilCommits) {
- arguments.add(Arguments.of(CompressionAlgorithm.GZIP, 111, 110, deferUntilCommit));
- arguments.add(Arguments.of(CompressionAlgorithm.NONE, 1172, 1171, deferUntilCommit));
- arguments.add(Arguments.of(CompressionAlgorithm.LZ4, 135, 134, deferUntilCommit));
- arguments.add(Arguments.of(CompressionAlgorithm.SNAPPY, 164, 160, deferUntilCommit));
- arguments.add(Arguments.of(CompressionAlgorithm.SNAPPY_FRAMED, 162, 158, deferUntilCommit));
- arguments.add(Arguments.of(CompressionAlgorithm.ZSTD, 101, 100, deferUntilCommit));
+ arguments.add(Arguments.of(CompressionAlgorithm.GZIP, 164, 165, deferUntilCommit));
+ arguments.add(Arguments.of(CompressionAlgorithm.NONE, 1226, 1226, deferUntilCommit));
+ arguments.add(Arguments.of(CompressionAlgorithm.LZ4, 190, 190, deferUntilCommit));
+ arguments.add(Arguments.of(CompressionAlgorithm.SNAPPY, 216, 216, deferUntilCommit));
+ arguments.add(Arguments.of(CompressionAlgorithm.SNAPPY_FRAMED, 214, 214, deferUntilCommit));
+ arguments.add(Arguments.of(CompressionAlgorithm.ZSTD, 155, 155, deferUntilCommit));
}
return arguments.stream();
diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/TkmsMessageValidationIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/TkmsMessageValidationIntTest.java
index 8791613..29a5449 100644
--- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/TkmsMessageValidationIntTest.java
+++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/TkmsMessageValidationIntTest.java
@@ -7,7 +7,10 @@
import com.transferwise.kafka.tkms.api.TkmsMessage;
import com.transferwise.kafka.tkms.test.BaseIntTest;
import com.transferwise.kafka.tkms.test.BaseTestEnvironment;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.springframework.beans.factory.annotation.Autowired;
@@ -20,9 +23,17 @@ class TkmsMessageValidationIntTest extends BaseIntTest {
@Autowired
private ITransactionsHelper transactionsHelper;
+ private boolean uuidHeaderRequired;
+
+ @BeforeEach
+ public void saveConfig() {
+ uuidHeaderRequired = tkmsProperties.isUuidHeaderRequired();
+ }
+
@AfterEach
public void cleanup() {
tkmsProperties.setDeferMessageRegistrationUntilCommit(false);
+ tkmsProperties.setUuidHeaderRequired(uuidHeaderRequired);
}
protected void setupConfig(boolean deferUntilCommit) {
@@ -40,4 +51,35 @@ void invalidMessagesDoNotPassValidation(boolean deferUntilCommit) {
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("0: No topic provided.");
}
+
+ @Test
+ void failsToSendMessageThatHasNoUuidWhenUuidHeaderIsRequired() {
+ tkmsProperties.setUuidHeaderRequired(true);
+
+ assertThatThrownBy(
+ () -> transactionsHelper.withTransaction().run(
+ () -> transactionalKafkaMessageSender.sendMessage(
+ new TkmsMessage()
+ .setTopic(testProperties.getTestTopic())
+ .setValue(new byte[1])
+ )
+ )
+ ).isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("0: Message is required to have @{x-wise-uuid} header.");
+ }
+
+ @Test
+ void doesNotRequireUuidPresentIfItIsNotRequiredByConfiguration() {
+ tkmsProperties.setUuidHeaderRequired(false);
+
+ transactionsHelper.withTransaction().run(
+ () -> transactionalKafkaMessageSender.sendMessage(
+ new TkmsMessage()
+ .setTopic(testProperties.getTestTopic())
+ .setValue(new byte[1])
+ )
+ );
+
+ Awaitility.await().until(() -> tkmsSentMessagesCollector.getSentMessages(testProperties.getTestTopic()).size() == 1);
+ }
}