Skip to content

Commit

Permalink
fix: produce tombstone records (#547)
Browse files Browse the repository at this point in the history
* fix: produce tombstone records

* fix: cr

* fix: fix integration test after cr
  • Loading branch information
fasazevedo authored Mar 27, 2024
1 parent 79f8b1b commit 082baf3
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Threading.Tasks;
using System;
using System.Threading.Tasks;
using KafkaFlow.Middlewares.Serializer.Resolvers;
using Microsoft.IO;

Expand Down Expand Up @@ -38,18 +39,21 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
{
await _typeResolver.OnProduceAsync(context);

byte[] messageValue;
byte[] messageValue = Array.Empty<byte>();

using (var buffer = s_memoryStreamManager.GetStream())
if (context.Message.Value is not null)
{
await _serializer
.SerializeAsync(
context.Message.Value,
buffer,
new SerializerContext(context.ProducerContext.Topic))
.ConfigureAwait(false);

messageValue = buffer.ToArray();
using (var buffer = s_memoryStreamManager.GetStream())
{
await _serializer
.SerializeAsync(
context.Message.Value,
buffer,
new SerializerContext(context.ProducerContext.Topic))
.ConfigureAwait(false);

messageValue = buffer.ToArray();
}
}

await next(context.SetMessage(context.Message.Key, messageValue)).ConfigureAwait(false);
Expand Down
25 changes: 25 additions & 0 deletions tests/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ internal static class Bootstrapper
internal const string PauseResumeGroupId = "consumer-pause-resume";
internal const string AvroGroupId = "consumer-avro";
internal const string JsonGroupId = "consumer-json";
internal const string NullGroupId = "consumer-null";


private const string ProtobufTopicName = "test-protobuf";
private const string ProtobufSchemaRegistryTopicName = "test-protobuf-sr";
Expand All @@ -39,6 +41,7 @@ internal static class Bootstrapper
private const string ProtobufGzipTopicName = "test-protobuf-gzip";
private const string ProtobufGzipTopicName2 = "test-protobuf-gzip-2";
private const string AvroTopicName = "test-avro";
private const string NullTopicName = "test-null";

private static readonly Lazy<IServiceProvider> s_lazyProvider = new(SetupProvider);

Expand Down Expand Up @@ -198,6 +201,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.CreateTopicIfNotExists(JsonGzipTopicName, 2, 1)
.CreateTopicIfNotExists(ProtobufGzipTopicName, 2, 1)
.CreateTopicIfNotExists(ProtobufGzipTopicName2, 2, 1)
.CreateTopicIfNotExists(NullTopicName, 1, 1)
.AddConsumer(
consumer => consumer
.Topic(ProtobufTopicName)
Expand Down Expand Up @@ -249,6 +253,21 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
handlers
.WithHandlerLifetime(InstanceLifetime.Singleton)
.AddHandlersFromAssemblyOf<MessageHandler>())))
.AddConsumer(
consumer => consumer
.Topic(NullTopicName)
.WithGroupId(NullGroupId)
.WithBufferSize(100)
.WithWorkersCount(10)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.AddMiddlewares(
middlewares => middlewares
.AddTypedHandlers(
handlers =>
handlers
.WithHandlerLifetime(InstanceLifetime.Singleton)
.AddHandler<NullMessageHandler>()
)))
.AddConsumer(
consumer => consumer
.Topics(GzipTopicName)
Expand Down Expand Up @@ -298,6 +317,12 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<JsonCoreSerializer>()))
.AddProducer<NullProducer>(
producer => producer
.DefaultTopic(NullTopicName)
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<JsonCoreSerializer>()))
.AddProducer<JsonGzipProducer>(
producer => producer
.DefaultTopic(JsonGzipTopicName)
Expand Down
21 changes: 21 additions & 0 deletions tests/KafkaFlow.IntegrationTests/Core/Handlers/MessageStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ internal static class MessageStorage
private static readonly ConcurrentBag<TestProtoMessage> s_protoMessages = new();
private static readonly ConcurrentBag<(long, int)> s_versions = new();
private static readonly ConcurrentBag<byte[]> s_byteMessages = new();
private static readonly ConcurrentBag<byte[]> s_nullMessages = new();

public static void Add(ITestMessage message)
{
Expand All @@ -39,6 +40,11 @@ public static void Add(byte[] message)
s_byteMessages.Add(message);
}

public static void AddNullMessage(byte[] message)
{
s_nullMessages.Add(message);
}

public static async Task AssertCountMessageAsync(ITestMessage message, int count)
{
var start = DateTime.Now;
Expand Down Expand Up @@ -119,6 +125,21 @@ public static async Task AssertMessageAsync(byte[] message)
}
}

public static async Task AssertNullMessageAsync()
{
var start = DateTime.Now;
while (!s_nullMessages.IsEmpty)
{
if (DateTime.Now.Subtract(start).Seconds > TimeoutSec)
{
Assert.Fail("Null message not received");
return;
}

await Task.Delay(100).ConfigureAwait(false);
}
}

public static List<(long ticks, int version)> GetVersions()
{
return s_versions.ToList();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System.Threading.Tasks;

namespace KafkaFlow.IntegrationTests.Core.Handlers;

internal class NullMessageHandler : IMessageHandler<byte[]>
{
public Task Handle(IMessageContext context, byte[] message)
{
MessageStorage.AddNullMessage(message);
return Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace KafkaFlow.IntegrationTests.Core.Producers;

public class NullProducer
{

}
14 changes: 14 additions & 0 deletions tests/KafkaFlow.IntegrationTests/ProducerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,18 @@ public async Task ProduceNullKeyTest()
// Assert
await MessageStorage.AssertMessageAsync(message);
}

[TestMethod]
public async Task ProduceNullMessageTest()
{
// Arrange
var producer = _provider.GetRequiredService<IMessageProducer<NullProducer>>();
var key = Guid.NewGuid().ToString();

// Act
await producer.ProduceAsync(key, Array.Empty<byte>());

// Assert
await MessageStorage.AssertNullMessageAsync();
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
Expand Down Expand Up @@ -85,6 +86,60 @@ await _target.Invoke(
_typeResolverMock.VerifyAll();
}

[TestMethod]
public async Task Invoke_NullMessage_Serialize()
{
// Arrange
byte[] rawMessage = null;
var key = new object();
var deserializedMessage = new Message(key, new TestMessage());
IMessageContext resultContext = null;
var producerContext = new Mock<IProducerContext>();
producerContext.SetupGet(x => x.Topic).Returns("test-topic");

var transformedContextMock = new Mock<IMessageContext>();

_contextMock
.SetupGet(x => x.Message)
.Returns(deserializedMessage);

_typeResolverMock.Setup(x => x.OnProduceAsync(_contextMock.Object));

_serializerMock
.Setup(
x => x.SerializeAsync(
deserializedMessage.Value,
It.IsAny<Stream>(),
It.IsAny<ISerializerContext>()))
.Callback((object _, Stream stream, ISerializerContext _) => stream.WriteAsync(rawMessage));

_contextMock
.Setup(x => x.SetMessage(key, It.IsAny<IEnumerable<byte>>()))
.Returns(transformedContextMock.Object);

_contextMock
.SetupGet(x => x.ProducerContext)
.Returns(producerContext.Object);

// Act
await _target.Invoke(
_contextMock.Object,
ctx =>
{
resultContext = ctx;
return Task.CompletedTask;
});

// Assert
resultContext.Should().NotBeNull();
resultContext.Should().Be(transformedContextMock.Object);
resultContext.Message.Value.Should().BeNull();
_contextMock.VerifyAll();
_serializerMock.VerifyAll();
_typeResolverMock.VerifyAll();
}


private class TestMessage
{
}
Expand Down
2 changes: 2 additions & 0 deletions website/docs/guides/middlewares/serializer-middleware.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ Both classes can be provided as an argument through a factory method too.
For topics that have just one message type, use the `AddSingleTypeSerializer`/`AddSingleTypeDeserializer` method.
:::

Serializer middleware also handles the produce of tombstone records. The messages produced are `null` whenever the message value is null, but not when that value is an empty `byte` array.


```csharp
services.AddKafka(kafka => kafka
Expand Down

1 comment on commit 082baf3

@mrt181
Copy link

@mrt181 mrt181 commented on 082baf3 Aug 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix never produces a tombstone null value but defaults to an empty byte array when the provided value is null?
It doesn't even allow to use a custom serializer in case the value is null.
The documentation is updated to say that a null is needed and not an empty byte array to create a tombstone record?

Please sign in to comment.