Skip to content

Commit

Permalink
Variable quality of service on Kafka topics. Closes GH-602
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremydmiller committed Oct 20, 2023
1 parent fcf79fd commit 1f500bc
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/Transports/Kafka/Wolverine.Kafka.Tests/compliance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public async Task InitializeAsync()

await SenderIs(opts =>
{
opts.UseKafka("localhost:29092");
opts.UseKafka("localhost:29092").ConfigureConsumers(x => x.EnableAutoCommit = false);

opts.ListenToKafkaTopic(senderTopic);

Expand Down
26 changes: 17 additions & 9 deletions src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ internal class KafkaListener : IListener, IDisposable
private readonly ConsumerConfig _config;
private readonly IReceiver _receiver;
private readonly string? _messageTypeName;
private readonly QualityOfService _qualityOfService;

public KafkaListener(KafkaTopic topic, ConsumerConfig config, IReceiver receiver,
ILogger<KafkaListener> logger)
Expand All @@ -27,14 +28,21 @@ public KafkaListener(KafkaTopic topic, ConsumerConfig config, IReceiver receiver
_config = config;
_receiver = receiver;

_qualityOfService = _config.EnableAutoCommit.HasValue && !_config.EnableAutoCommit.Value
? QualityOfService.AtMostOnce
: QualityOfService.AtLeastOnce;

_runner = Task.Run(async () =>
{
_consumer.Subscribe(topic.TopicName);
try
{
while (!_cancellation.IsCancellationRequested)
{
// TODO -- watch that this isn't EnableAutoCommit = false
if (_qualityOfService == QualityOfService.AtMostOnce)
{
_consumer.Commit();
}

try
{
Expand All @@ -43,10 +51,13 @@ public KafkaListener(KafkaTopic topic, ConsumerConfig config, IReceiver receiver

var envelope = mapper.CreateEnvelope(result.Topic, message);
envelope.MessageType ??= _messageTypeName;

await receiver.ReceivedAsync(this, envelope);



await receiver.ReceivedAsync(this, envelope);
}
catch (OperationCanceledException)
{
// we're done here!
}
catch (Exception e)
{
Expand All @@ -67,11 +78,8 @@ public KafkaListener(KafkaTopic topic, ConsumerConfig config, IReceiver receiver

public ValueTask CompleteAsync(Envelope envelope)
{
if (_config.EnableAutoCommit != null)
{
_consumer.Commit();
}

// do nothing here, it's already ack'd before we get here

return ValueTask.CompletedTask;
}

Expand Down
13 changes: 13 additions & 0 deletions src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,17 @@ await client.CreateTopicsAsync(new[]
throw;
}
}
}

public enum QualityOfService
{
/// <summary>
/// "At least once" delivery guarantee by auto-ack'ing incoming messages
/// </summary>
AtLeastOnce,

/// <summary>
/// "At most once" delivery guarantee by trying to ack received messages before processing
/// </summary>
AtMostOnce
}

0 comments on commit 1f500bc

Please sign in to comment.