From 1f500bce3f6a9e832f5f9bba9a4153bd20bea408 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Fri, 20 Oct 2023 11:54:48 -0500 Subject: [PATCH] Variable quality of service on Kafka topics. Closes GH-602 --- .../Kafka/Wolverine.Kafka.Tests/compliance.cs | 2 +- .../Internals/KafkaListener.cs | 26 ++++++++++++------- .../Kafka/Wolverine.Kafka/KafkaTopic.cs | 13 ++++++++++ 3 files changed, 31 insertions(+), 10 deletions(-) diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/compliance.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/compliance.cs index c674d152f..cad709346 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/compliance.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/compliance.cs @@ -27,7 +27,7 @@ public async Task InitializeAsync() await SenderIs(opts => { - opts.UseKafka("localhost:29092"); + opts.UseKafka("localhost:29092").ConfigureConsumers(x => x.EnableAutoCommit = false); opts.ListenToKafkaTopic(senderTopic); diff --git a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs index f956a6125..741090506 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs @@ -14,6 +14,7 @@ internal class KafkaListener : IListener, IDisposable private readonly ConsumerConfig _config; private readonly IReceiver _receiver; private readonly string? _messageTypeName; + private readonly QualityOfService _qualityOfService; public KafkaListener(KafkaTopic topic, ConsumerConfig config, IReceiver receiver, ILogger logger) @@ -27,6 +28,10 @@ public KafkaListener(KafkaTopic topic, ConsumerConfig config, IReceiver receiver _config = config; _receiver = receiver; + _qualityOfService = _config.EnableAutoCommit.HasValue && !_config.EnableAutoCommit.Value + ? QualityOfService.AtMostOnce + : QualityOfService.AtLeastOnce; + _runner = Task.Run(async () => { _consumer.Subscribe(topic.TopicName); @@ -34,7 +39,10 @@ public KafkaListener(KafkaTopic topic, ConsumerConfig config, IReceiver receiver { while (!_cancellation.IsCancellationRequested) { - // TODO -- watch that this isn't EnableAutoCommit = false + if (_qualityOfService == QualityOfService.AtMostOnce) + { + _consumer.Commit(); + } try { @@ -43,10 +51,13 @@ public KafkaListener(KafkaTopic topic, ConsumerConfig config, IReceiver receiver var envelope = mapper.CreateEnvelope(result.Topic, message); envelope.MessageType ??= _messageTypeName; - - await receiver.ReceivedAsync(this, envelope); - + + await receiver.ReceivedAsync(this, envelope); + } + catch (OperationCanceledException) + { + // we're done here! } catch (Exception e) { @@ -67,11 +78,8 @@ public KafkaListener(KafkaTopic topic, ConsumerConfig config, IReceiver receiver public ValueTask CompleteAsync(Envelope envelope) { - if (_config.EnableAutoCommit != null) - { - _consumer.Commit(); - } - + // do nothing here, it's already ack'd before we get here + return ValueTask.CompletedTask; } diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs index 5ea010206..f15bf89a4 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs @@ -105,4 +105,17 @@ await client.CreateTopicsAsync(new[] throw; } } +} + +public enum QualityOfService +{ + /// + /// "At least once" delivery guarantee by auto-ack'ing incoming messages + /// + AtLeastOnce, + + /// + /// "At most once" delivery guarantee by trying to ack received messages before processing + /// + AtMostOnce } \ No newline at end of file