Skip to content

Commit

Permalink
Add some locking in KafkaMessageConsumer
Browse files Browse the repository at this point in the history
Protect _consumers and _dispatchers against concurrent modifications from multiple threads
  • Loading branch information
douggish committed Sep 18, 2023
1 parent b89622c commit 59dc229
Showing 1 changed file with 27 additions and 11 deletions.
38 changes: 27 additions & 11 deletions IO.Eventuate.Tram/Consumer/Kafka/KafkaMessageConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
Expand All @@ -29,8 +30,9 @@ public class KafkaMessageConsumer : IMessageConsumer, IAsyncDisposable

private readonly string _id = Guid.NewGuid().ToString();
private readonly string _bootstrapServers;
private readonly List<EventuateKafkaConsumer> _consumers = new List<EventuateKafkaConsumer>();
private readonly List<SwimlaneBasedDispatcher> _dispatchers = new List<SwimlaneBasedDispatcher>();
private readonly List<EventuateKafkaConsumer> _consumers = new();
private readonly List<SwimlaneBasedDispatcher> _dispatchers = new();
private readonly object _lockObject = new();

public KafkaMessageConsumer(string bootstrapServers,
EventuateKafkaConsumerConfigurationProperties eventuateKafkaConsumerConfigurationProperties,
Expand Down Expand Up @@ -67,18 +69,24 @@ public IMessageSubscription Subscribe(string subscriberId, ISet<string> channels
_eventuateKafkaConsumerConfigurationProperties,
_loggerFactory);

_consumers.Add(kc);
_dispatchers.Add(swimLaneBasedDispatcher);
lock (_lockObject)
{
_consumers.Add(kc);
_dispatchers.Add(swimLaneBasedDispatcher);
}

kc.Start();

_logger.LogDebug($"-{logContext}");
return new MessageSubscription(async () =>
{
await swimLaneBasedDispatcher.StopAsync();
_dispatchers.Remove(swimLaneBasedDispatcher);
await kc.DisposeAsync();
_consumers.Remove(kc);
lock (_lockObject)
{
_dispatchers.Remove(swimLaneBasedDispatcher);
_consumers.Remove(kc);
}
});
}

Expand Down Expand Up @@ -108,18 +116,26 @@ public string GetId()
public async Task CloseAsync()
{
_logger.LogDebug($"+{nameof(CloseAsync)}");

foreach (SwimlaneBasedDispatcher dispatcher in _dispatchers)

List<SwimlaneBasedDispatcher> dispatchers;
List<EventuateKafkaConsumer> consumers;
lock (_lockObject)
{
dispatchers = _dispatchers.ToList();
_dispatchers.Clear();
consumers = _consumers.ToList();
_consumers.Clear();
}

foreach (SwimlaneBasedDispatcher dispatcher in dispatchers)
{
await dispatcher.StopAsync();
}
_dispatchers.Clear();

foreach (EventuateKafkaConsumer consumer in _consumers)
foreach (EventuateKafkaConsumer consumer in consumers)
{
await consumer.DisposeAsync();
}
_consumers.Clear();

_logger.LogDebug($"-{nameof(CloseAsync)}");
}
Expand Down

0 comments on commit 59dc229

Please sign in to comment.