diff --git a/src/KafkaFlow/Middlewares/Serializer/SerializerProducerMiddleware.cs b/src/KafkaFlow/Middlewares/Serializer/SerializerProducerMiddleware.cs index 265ed0a7a..0c7b3f9ed 100644 --- a/src/KafkaFlow/Middlewares/Serializer/SerializerProducerMiddleware.cs +++ b/src/KafkaFlow/Middlewares/Serializer/SerializerProducerMiddleware.cs @@ -1,4 +1,5 @@ -using System.Threading.Tasks; +using System; +using System.Threading.Tasks; using KafkaFlow.Middlewares.Serializer.Resolvers; using Microsoft.IO; @@ -38,18 +39,21 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next) { await _typeResolver.OnProduceAsync(context); - byte[] messageValue; + byte[] messageValue = Array.Empty(); - using (var buffer = s_memoryStreamManager.GetStream()) + if (context.Message.Value is not null) { - await _serializer - .SerializeAsync( - context.Message.Value, - buffer, - new SerializerContext(context.ProducerContext.Topic)) - .ConfigureAwait(false); - - messageValue = buffer.ToArray(); + using (var buffer = s_memoryStreamManager.GetStream()) + { + await _serializer + .SerializeAsync( + context.Message.Value, + buffer, + new SerializerContext(context.ProducerContext.Topic)) + .ConfigureAwait(false); + + messageValue = buffer.ToArray(); + } } await next(context.SetMessage(context.Message.Key, messageValue)).ConfigureAwait(false); diff --git a/tests/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs b/tests/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs index e4fbefcff..272df2b19 100644 --- a/tests/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs +++ b/tests/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs @@ -29,6 +29,8 @@ internal static class Bootstrapper internal const string PauseResumeGroupId = "consumer-pause-resume"; internal const string AvroGroupId = "consumer-avro"; internal const string JsonGroupId = "consumer-json"; + internal const string NullGroupId = "consumer-null"; + private const string ProtobufTopicName = "test-protobuf"; private const string ProtobufSchemaRegistryTopicName = "test-protobuf-sr"; @@ -39,6 +41,7 @@ internal static class Bootstrapper private const string ProtobufGzipTopicName = "test-protobuf-gzip"; private const string ProtobufGzipTopicName2 = "test-protobuf-gzip-2"; private const string AvroTopicName = "test-avro"; + private const string NullTopicName = "test-null"; private static readonly Lazy s_lazyProvider = new(SetupProvider); @@ -198,6 +201,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection .CreateTopicIfNotExists(JsonGzipTopicName, 2, 1) .CreateTopicIfNotExists(ProtobufGzipTopicName, 2, 1) .CreateTopicIfNotExists(ProtobufGzipTopicName2, 2, 1) + .CreateTopicIfNotExists(NullTopicName, 1, 1) .AddConsumer( consumer => consumer .Topic(ProtobufTopicName) @@ -249,6 +253,21 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection handlers .WithHandlerLifetime(InstanceLifetime.Singleton) .AddHandlersFromAssemblyOf()))) + .AddConsumer( + consumer => consumer + .Topic(NullTopicName) + .WithGroupId(NullGroupId) + .WithBufferSize(100) + .WithWorkersCount(10) + .WithAutoOffsetReset(AutoOffsetReset.Latest) + .AddMiddlewares( + middlewares => middlewares + .AddTypedHandlers( + handlers => + handlers + .WithHandlerLifetime(InstanceLifetime.Singleton) + .AddHandler() + ))) .AddConsumer( consumer => consumer .Topics(GzipTopicName) @@ -298,6 +317,12 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection .AddMiddlewares( middlewares => middlewares .AddSerializer())) + .AddProducer( + producer => producer + .DefaultTopic(NullTopicName) + .AddMiddlewares( + middlewares => middlewares + .AddSerializer())) .AddProducer( producer => producer .DefaultTopic(JsonGzipTopicName) diff --git a/tests/KafkaFlow.IntegrationTests/Core/Handlers/MessageStorage.cs b/tests/KafkaFlow.IntegrationTests/Core/Handlers/MessageStorage.cs index 2b8ce13f3..40fc634e7 100644 --- a/tests/KafkaFlow.IntegrationTests/Core/Handlers/MessageStorage.cs +++ b/tests/KafkaFlow.IntegrationTests/Core/Handlers/MessageStorage.cs @@ -17,6 +17,7 @@ internal static class MessageStorage private static readonly ConcurrentBag s_protoMessages = new(); private static readonly ConcurrentBag<(long, int)> s_versions = new(); private static readonly ConcurrentBag s_byteMessages = new(); + private static readonly ConcurrentBag s_nullMessages = new(); public static void Add(ITestMessage message) { @@ -39,6 +40,11 @@ public static void Add(byte[] message) s_byteMessages.Add(message); } + public static void AddNullMessage(byte[] message) + { + s_nullMessages.Add(message); + } + public static async Task AssertCountMessageAsync(ITestMessage message, int count) { var start = DateTime.Now; @@ -119,6 +125,21 @@ public static async Task AssertMessageAsync(byte[] message) } } + public static async Task AssertNullMessageAsync() + { + var start = DateTime.Now; + while (!s_nullMessages.IsEmpty) + { + if (DateTime.Now.Subtract(start).Seconds > TimeoutSec) + { + Assert.Fail("Null message not received"); + return; + } + + await Task.Delay(100).ConfigureAwait(false); + } + } + public static List<(long ticks, int version)> GetVersions() { return s_versions.ToList(); diff --git a/tests/KafkaFlow.IntegrationTests/Core/Handlers/NullMessageHandler.cs b/tests/KafkaFlow.IntegrationTests/Core/Handlers/NullMessageHandler.cs new file mode 100644 index 000000000..4d9b546be --- /dev/null +++ b/tests/KafkaFlow.IntegrationTests/Core/Handlers/NullMessageHandler.cs @@ -0,0 +1,12 @@ +using System.Threading.Tasks; + +namespace KafkaFlow.IntegrationTests.Core.Handlers; + +internal class NullMessageHandler : IMessageHandler +{ + public Task Handle(IMessageContext context, byte[] message) + { + MessageStorage.AddNullMessage(message); + return Task.CompletedTask; + } +} \ No newline at end of file diff --git a/tests/KafkaFlow.IntegrationTests/Core/Producers/NullProducer.cs b/tests/KafkaFlow.IntegrationTests/Core/Producers/NullProducer.cs new file mode 100644 index 000000000..0e9628211 --- /dev/null +++ b/tests/KafkaFlow.IntegrationTests/Core/Producers/NullProducer.cs @@ -0,0 +1,6 @@ +namespace KafkaFlow.IntegrationTests.Core.Producers; + +public class NullProducer +{ + +} \ No newline at end of file diff --git a/tests/KafkaFlow.IntegrationTests/ProducerTest.cs b/tests/KafkaFlow.IntegrationTests/ProducerTest.cs index 8ca7c8c4a..fd8006258 100644 --- a/tests/KafkaFlow.IntegrationTests/ProducerTest.cs +++ b/tests/KafkaFlow.IntegrationTests/ProducerTest.cs @@ -36,4 +36,18 @@ public async Task ProduceNullKeyTest() // Assert await MessageStorage.AssertMessageAsync(message); } + + [TestMethod] + public async Task ProduceNullMessageTest() + { + // Arrange + var producer = _provider.GetRequiredService>(); + var key = Guid.NewGuid().ToString(); + + // Act + await producer.ProduceAsync(key, Array.Empty()); + + // Assert + await MessageStorage.AssertNullMessageAsync(); + } } diff --git a/tests/KafkaFlow.UnitTests/Serializers/SerializerProducerMiddlewareTests.cs b/tests/KafkaFlow.UnitTests/Serializers/SerializerProducerMiddlewareTests.cs index 7b2b41950..27ac0c47f 100644 --- a/tests/KafkaFlow.UnitTests/Serializers/SerializerProducerMiddlewareTests.cs +++ b/tests/KafkaFlow.UnitTests/Serializers/SerializerProducerMiddlewareTests.cs @@ -1,3 +1,4 @@ +using System.Collections.Generic; using System.IO; using System.Linq; using System.Threading.Tasks; @@ -85,6 +86,60 @@ await _target.Invoke( _typeResolverMock.VerifyAll(); } + [TestMethod] + public async Task Invoke_NullMessage_Serialize() + { + // Arrange + byte[] rawMessage = null; + var key = new object(); + var deserializedMessage = new Message(key, new TestMessage()); + IMessageContext resultContext = null; + var producerContext = new Mock(); + producerContext.SetupGet(x => x.Topic).Returns("test-topic"); + + var transformedContextMock = new Mock(); + + _contextMock + .SetupGet(x => x.Message) + .Returns(deserializedMessage); + + _typeResolverMock.Setup(x => x.OnProduceAsync(_contextMock.Object)); + + _serializerMock + .Setup( + x => x.SerializeAsync( + deserializedMessage.Value, + It.IsAny(), + It.IsAny())) + .Callback((object _, Stream stream, ISerializerContext _) => stream.WriteAsync(rawMessage)); + + _contextMock + .Setup(x => x.SetMessage(key, It.IsAny>())) + .Returns(transformedContextMock.Object); + + _contextMock + .SetupGet(x => x.ProducerContext) + .Returns(producerContext.Object); + + // Act + await _target.Invoke( + _contextMock.Object, + ctx => + { + resultContext = ctx; + return Task.CompletedTask; + }); + + // Assert + resultContext.Should().NotBeNull(); + resultContext.Should().Be(transformedContextMock.Object); + resultContext.Message.Value.Should().BeNull(); + _contextMock.VerifyAll(); + _serializerMock.VerifyAll(); + _typeResolverMock.VerifyAll(); + } + + private class TestMessage { } diff --git a/website/docs/guides/middlewares/serializer-middleware.md b/website/docs/guides/middlewares/serializer-middleware.md index fded57bf8..ccba4baa9 100644 --- a/website/docs/guides/middlewares/serializer-middleware.md +++ b/website/docs/guides/middlewares/serializer-middleware.md @@ -26,6 +26,8 @@ Both classes can be provided as an argument through a factory method too. For topics that have just one message type, use the `AddSingleTypeSerializer`/`AddSingleTypeDeserializer` method. ::: +Serializer middleware also handles the produce of tombstone records. The messages produced are `null` whenever the message value is null, but not when that value is an empty `byte` array. + ```csharp services.AddKafka(kafka => kafka