diff --git a/IO.Eventuate.Tram.IntegrationTests/TestFixtures/BackpressureTests.cs b/IO.Eventuate.Tram.IntegrationTests/TestFixtures/BackpressureTests.cs index db3727e..1cac07b 100644 --- a/IO.Eventuate.Tram.IntegrationTests/TestFixtures/BackpressureTests.cs +++ b/IO.Eventuate.Tram.IntegrationTests/TestFixtures/BackpressureTests.cs @@ -28,7 +28,7 @@ public async Task Setup() } }; TestSetup("eventuate", false, properties); - CleanupTest(); + await CleanupTestAsync(); } [TearDown] diff --git a/IO.Eventuate.Tram.IntegrationTests/TestFixtures/IntegrationTestsBase.cs b/IO.Eventuate.Tram.IntegrationTests/TestFixtures/IntegrationTestsBase.cs index b47fa15..fa5b174 100644 --- a/IO.Eventuate.Tram.IntegrationTests/TestFixtures/IntegrationTestsBase.cs +++ b/IO.Eventuate.Tram.IntegrationTests/TestFixtures/IntegrationTestsBase.cs @@ -24,6 +24,8 @@ public class IntegrationTestsBase protected const string AggregateType12 = "TestMessage12Topic"; protected const string AggregateType34 = "TestMessage34Topic"; protected const string AggregateTypeDelay = "TestMessageDelayTopic"; + protected const string TestPartitionAssignmentTopic1 = "TestPartitionAssignmentTopic1"; + protected const string TestPartitionAssignmentTopic2 = "TestPartitionAssignmentTopic2"; protected string EventuateDatabaseSchemaName = "eventuate"; public static string PingFileName = "ping.txt"; @@ -75,9 +77,9 @@ protected void TestSetup(string schema, bool withInterceptor, EventuateKafkaCons } } - protected void CleanupTest() + protected async Task CleanupTestAsync() { - ClearDb(GetDbContext(), EventuateDatabaseSchemaName); + await ClearDbAsync(GetDbContext(), EventuateDatabaseSchemaName); GetTestConsumer().Reset(); GetTestMessageInterceptor()?.Reset(); } @@ -88,7 +90,7 @@ protected async Task CleanupKafkaTopics() config.BootstrapServers = TestSettings.KafkaBootstrapServers; using var admin = new AdminClientBuilder(config).Build(); Metadata kafkaMetadata = admin.GetMetadata(TimeSpan.FromSeconds(10)); - foreach (var topic in new[] {AggregateType12, AggregateType34, AggregateTypeDelay}) + foreach (var topic in new[] {AggregateType12, AggregateType34, AggregateTypeDelay, TestPartitionAssignmentTopic1, TestPartitionAssignmentTopic2}) { TopicMetadata paMessagesMetadata = kafkaMetadata.Topics.Find(t => t.Topic.Equals(topic)); if (paMessagesMetadata != null) @@ -212,10 +214,10 @@ protected EventuateTramDbContext GetDbContext() return _dbContext; } - protected void ClearDb(EventuateTramDbContext dbContext, String eventuateDatabaseSchemaName) + protected static async Task ClearDbAsync(EventuateTramDbContext dbContext, String eventuateDatabaseSchemaName) { - dbContext.Database.ExecuteSqlRaw(String.Format("Delete from [{0}].[message]", eventuateDatabaseSchemaName)); - dbContext.Database.ExecuteSqlRaw(String.Format("Delete from [{0}].[received_messages]", eventuateDatabaseSchemaName)); + await dbContext.Database.ExecuteSqlRawAsync($"Delete from [{eventuateDatabaseSchemaName}].[message]"); + await dbContext.Database.ExecuteSqlRawAsync($"Delete from [{eventuateDatabaseSchemaName}].[received_messages]"); } } } diff --git a/IO.Eventuate.Tram.IntegrationTests/TestFixtures/PartitionRebalancingTests.cs b/IO.Eventuate.Tram.IntegrationTests/TestFixtures/PartitionRebalancingTests.cs new file mode 100644 index 0000000..35ab210 --- /dev/null +++ b/IO.Eventuate.Tram.IntegrationTests/TestFixtures/PartitionRebalancingTests.cs @@ -0,0 +1,258 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Confluent.Kafka; +using Confluent.Kafka.Admin; +using IO.Eventuate.Tram.Database; +using IO.Eventuate.Tram.Local.Kafka.Consumer; +using IO.Eventuate.Tram.Messaging.Common; +using IO.Eventuate.Tram.Messaging.Consumer; +using IO.Eventuate.Tram.Messaging.Producer; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Console; +using NUnit.Framework; + +namespace IO.Eventuate.Tram.IntegrationTests.TestFixtures; + +[TestFixture(null)] +[TestFixture("cooperative-sticky")] +public class PartitionRebalancingTests : IntegrationTestsBase +{ + private readonly string _partitionAssignmentStrategyOverride; + private const string DbSchemaName = "eventuate"; + private ServiceProvider _rootServiceProvider; + private AsyncServiceScope _serviceScope; + private IMessageConsumer _consumer; + private IMessageProducer _producer; + + public PartitionRebalancingTests(string partitionAssignmentStrategyOverride) + { + _partitionAssignmentStrategyOverride = partitionAssignmentStrategyOverride; + } + + [SetUp] + public async Task SetUp() + { + _rootServiceProvider = CreateTestServiceProvider(_partitionAssignmentStrategyOverride); + _serviceScope = _rootServiceProvider.CreateAsyncScope(); + + IServiceProvider serviceProvider = _serviceScope.ServiceProvider; + + _consumer = serviceProvider.GetRequiredService(); + _producer = serviceProvider.GetRequiredService(); + + DbContextOptionsBuilder builder = new(); + builder.UseSqlServer(TestSettings.ConnectionStrings.EventuateTramDbConnection); + await using var dbContext = new EventuateTramDbContext(builder.Options, new EventuateSchema(DbSchemaName)); + await ClearDbAsync(dbContext, DbSchemaName); + await CleanupKafkaTopics(); + } + + [TearDown] + public async Task TearDown() + { + await _consumer.CloseAsync(); + await _serviceScope.DisposeAsync(); + await _rootServiceProvider.DisposeAsync(); + } + + [Test] + public async Task PartitionsRebalanced_MessagesInSwimlaneDispatcherQueues_MessagesConsumedOnceEachAndInOrder() + { + // Arrange + var topics = new HashSet + { + TestPartitionAssignmentTopic1, + TestPartitionAssignmentTopic2 + }; + + const string partitionId1 = "1"; + const string partitionId2 = "2"; + var partitionIds = new List + { + partitionId1, + partitionId2 + }; + + var sentMessagesByTopicPartition = new Dictionary<(string Topic, string PartitionId), List>(); + var consumerMessages = + new Dictionary<(string Topic, string PartitionId), + ConcurrentQueue<(IMessage Message, string ConsumerName, DateTimeOffset StartTime, DateTimeOffset EndTime)>>(); + + // Publish messages that will be written to 2 different partitions on each topic + foreach (string topic in topics) + { + foreach (string partitionId in partitionIds) + { + for (var messageNumber = 0; messageNumber < 10; messageNumber++) + { + var payload = $"topic:{topic}-partitionId:{partitionId}-message:{messageNumber}"; + IMessage message = MessageBuilder + .WithPayload(payload) + .WithHeader(MessageHeaders.PartitionId, partitionId) + .Build(); + + if (!sentMessagesByTopicPartition.TryGetValue((topic, partitionId), out List messages)) + { + messages = new List(); + sentMessagesByTopicPartition.Add((topic, partitionId), messages); + } + messages.Add(message); + + await _producer.SendAsync(topic, message); + } + + consumerMessages.Add((topic, partitionId), + new ConcurrentQueue<(IMessage Message, string consumerName, DateTimeOffset StartTime, DateTimeOffset EndTime)>()); + } + } + + var consumerGroupId = Guid.NewGuid().ToString(); + + var waitToConsume = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + // Start a consumer where the message handler is waiting to allow messages to build up in the swimlane queues + _consumer.Subscribe(consumerGroupId, topics, + (m, _, ct) => HandleMessageAsync(waitToConsume.Task, m, "consumer1", consumerMessages, ct)); + + await WaitForStableConsumerGroupAsync(consumerGroupId); + + // Wait for SwimlaneDispatcher message queues to populate + await Task.Delay(TimeSpan.FromSeconds(10)); + + // Act - Start a second consumer in the same consumer group to trigger a rebalance + _consumer.Subscribe(consumerGroupId, new HashSet { TestPartitionAssignmentTopic1 }, + (m, _, ct) => HandleMessageAsync(waitToConsume.Task, m, "consumer2", consumerMessages, ct)); + + await WaitForStableConsumerGroupAsync(consumerGroupId); + + // Wait for SwimlaneDispatcher message queues to populate + await Task.Delay(TimeSpan.FromSeconds(10)); + + // Allow processing to proceed + waitToConsume.SetResult(); + + // Assert + // Wait for messages to be processed + await Task.Delay(TimeSpan.FromSeconds(10)); + + foreach (string topic in topics) + { + foreach (string partitionId in partitionIds) + { + var context = $"topic={topic} partitionId={partitionId}"; + + List<(IMessage Message, string ConsumerName, DateTimeOffset StartTime, DateTimeOffset EndTime)> messages = + consumerMessages[(topic, partitionId)].ToList(); + + List expectedMessages = sentMessagesByTopicPartition[(topic, partitionId)]; + // Check that message count is correct + Assert.That(messages, Has.Count.EqualTo(expectedMessages.Count), context); + + // Check that messages were processed in the order they were published with no overlap in time + Assert.Multiple(() => + { + for (var i = 0; i < expectedMessages.Count; i++) + { + IMessage expectedMessage = expectedMessages[i]; + IMessage actualMessage = messages[i].Message; + Assert.That(actualMessage.Id, Is.EqualTo(expectedMessage.Id), $"{context} index={i} Id"); + Assert.That(actualMessage.Payload, Is.EqualTo(expectedMessage.Payload), $"{context} index={i} Payload"); + if (i > 0) + { + Assert.That(messages[i].StartTime, Is.GreaterThan(messages[i - 1].EndTime), + $"{context} index={i} start time less than previous end time"); + } + } + }); + } + } + } + + private static async Task HandleMessageAsync(Task waitBeforeConsumingTask, IMessage message, string consumerName, + IDictionary<(string Topic, string PartitionId), + ConcurrentQueue<(IMessage Message, string ConsumerName, DateTimeOffset StartTime, DateTimeOffset EndTime)>> consumerMessages, + CancellationToken cancellationToken) + { + await waitBeforeConsumingTask; + + cancellationToken.ThrowIfCancellationRequested(); + + DateTimeOffset startTime = DateTimeOffset.Now; + string topic = message.GetRequiredHeader(MessageHeaders.Destination); + string partitionId = message.GetRequiredHeader(MessageHeaders.PartitionId); + ConcurrentQueue<(IMessage Message, string ConsumerName, DateTimeOffset StartTime, DateTimeOffset EndTime)> messages = + consumerMessages[(topic, partitionId)]; + await Task.Delay(50, cancellationToken); + DateTimeOffset endTime = DateTimeOffset.Now; + + messages.Enqueue((message, consumerName, startTime, endTime)); + } + + private ServiceProvider CreateTestServiceProvider(string partitionAssignmentStrategy = null) + { + IServiceCollection services = new ServiceCollection(); + services.AddLogging(b => + b.AddSimpleConsole(o => + { + o.ColorBehavior = LoggerColorBehavior.Disabled; + o.TimestampFormat = "HH:mm:ss "; + }) + .AddFilter("Default", LogLevel.Information) + .AddFilter("IO.Eventuate", LogLevel.Debug)); + services.AddOptions(); + + EventuateKafkaConsumerConfigurationProperties consumerProperties = EventuateKafkaConsumerConfigurationProperties.Empty(); + if (partitionAssignmentStrategy != null) + { + consumerProperties.Properties["partition.assignment.strategy"] = partitionAssignmentStrategy; + } + + services.AddEventuateTramSqlKafkaTransport("eventuate", TestSettings.KafkaBootstrapServers, + consumerProperties, (_, options) => + { + options.UseSqlServer(TestSettings.ConnectionStrings.EventuateTramDbConnection); + }); + + return services.BuildServiceProvider(new ServiceProviderOptions + { ValidateScopes = true, ValidateOnBuild = true }); + } + + private async Task WaitForStableConsumerGroupAsync(string consumerGroupId) + { + TimeSpan stableConsumerTimeout = TimeSpan.FromSeconds(30); + + var kafkaAdminConfig = new AdminClientConfig { BootstrapServers = TestSettings.KafkaBootstrapServers }; + using IAdminClient adminClient = new AdminClientBuilder(kafkaAdminConfig).Build(); + + var stopwatch = new Stopwatch(); + stopwatch.Start(); + var isConsumerGroupStable = false; + while (!isConsumerGroupStable) + { + DescribeConsumerGroupsResult result = + await adminClient.DescribeConsumerGroupsAsync(new[] { consumerGroupId }); + if (result.ConsumerGroupDescriptions.Any() && + result.ConsumerGroupDescriptions[0].State == ConsumerGroupState.Stable) + { + isConsumerGroupStable = true; + stopwatch.Stop(); + } + else + { + if (stopwatch.Elapsed > stableConsumerTimeout) + { + throw new TimeoutException( + $"Timed out waiting for consumer group {consumerGroupId} to become stable after {stopwatch.Elapsed}."); + } + await Task.Delay(100); + } + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram.IntegrationTests/TestFixtures/PerformanceTests.cs b/IO.Eventuate.Tram.IntegrationTests/TestFixtures/PerformanceTests.cs index 44ace25..3fe1782 100644 --- a/IO.Eventuate.Tram.IntegrationTests/TestFixtures/PerformanceTests.cs +++ b/IO.Eventuate.Tram.IntegrationTests/TestFixtures/PerformanceTests.cs @@ -17,7 +17,7 @@ public async Task Setup() { await CleanupKafkaTopics(); TestSetup("eventuate", false, EventuateKafkaConsumerConfigurationProperties.Empty()); - CleanupTest(); + await CleanupTestAsync(); } [TearDown] diff --git a/IO.Eventuate.Tram.IntegrationTests/TestFixtures/ProducerConsumerIntegrationTests.cs b/IO.Eventuate.Tram.IntegrationTests/TestFixtures/ProducerConsumerIntegrationTests.cs index b91fa19..5ddbe32 100644 --- a/IO.Eventuate.Tram.IntegrationTests/TestFixtures/ProducerConsumerIntegrationTests.cs +++ b/IO.Eventuate.Tram.IntegrationTests/TestFixtures/ProducerConsumerIntegrationTests.cs @@ -27,7 +27,7 @@ public async Task Setup() { await CleanupKafkaTopics(); TestSetup(_schema, true, EventuateKafkaConsumerConfigurationProperties.Empty()); - CleanupTest(); + await CleanupTestAsync(); } [TearDown] diff --git a/IO.Eventuate.Tram.IntegrationTests/docker-compose.yml b/IO.Eventuate.Tram.IntegrationTests/docker-compose.yml index 8bbb8bc..9edc25c 100644 --- a/IO.Eventuate.Tram.IntegrationTests/docker-compose.yml +++ b/IO.Eventuate.Tram.IntegrationTests/docker-compose.yml @@ -56,6 +56,8 @@ services: kafka-topics --create --if-not-exists --bootstrap-server kafka:29092 --partitions 5 --replication-factor 1 --topic TestMessage12Topic && \ kafka-topics --create --if-not-exists --bootstrap-server kafka:29092 --partitions 3 --replication-factor 1 --topic TestMessage34Topic && \ kafka-topics --create --if-not-exists --bootstrap-server kafka:29092 --partitions 1 --replication-factor 1 --topic TestMessageDelayTopic && \ + kafka-topics --create --if-not-exists --bootstrap-server kafka:29092 --partitions 2 --replication-factor 1 --topic TestPartitionAssignmentTopic1 && \ + kafka-topics --create --if-not-exists --bootstrap-server kafka:29092 --partitions 2 --replication-factor 1 --topic TestPartitionAssignmentTopic2 && \ exit'" environment: # The following settings are listed here only to satisfy the image's requirements.