From 1c8e4c2238151ce2520dc1dcb843b200c4dd8d5b Mon Sep 17 00:00:00 2001 From: Rui Quintas Barbosa Date: Fri, 13 Oct 2023 16:31:20 +0100 Subject: [PATCH] tests: Include Polly --- .../GlobalEventsTest.cs | 42 +++++++++++++++---- .../KafkaFlow.IntegrationTests.csproj | 1 + 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/src/KafkaFlow.IntegrationTests/GlobalEventsTest.cs b/src/KafkaFlow.IntegrationTests/GlobalEventsTest.cs index cb8d6e0ee..dd6f01239 100644 --- a/src/KafkaFlow.IntegrationTests/GlobalEventsTest.cs +++ b/src/KafkaFlow.IntegrationTests/GlobalEventsTest.cs @@ -2,7 +2,7 @@ { using System; using System.IO; - using System.Threading; + using System.Linq; using System.Threading.Tasks; using AutoFixture; using KafkaFlow.Configuration; @@ -16,11 +16,13 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.VisualStudio.TestTools.UnitTesting; + using Polly; [TestClass] public class GlobalEventsTest { private readonly Fixture fixture = new(); + private bool isPartitionAssigned; [TestMethod] public async Task SubscribeGlobalEvents_AllEvents_TriggeredCorrectly() @@ -49,7 +51,7 @@ void ConfigureGlobalEvents(IGlobalEvents observers) }); } - var provider = this.GetServiceProvider(ConfigureGlobalEvents); + var provider = await this.GetServiceProviderAsync(ConfigureGlobalEvents); MessageStorage.Clear(); var producer = provider.GetRequiredService>(); @@ -67,7 +69,7 @@ void ConfigureGlobalEvents(IGlobalEvents observers) } [TestMethod] - public void SubscribeGlobalEvents_MessageContext_IsAssignedCorrectly() + public async Task SubscribeGlobalEvents_MessageContext_IsAssignedCorrectly() { // Arrange IMessageContext messageContext = null; @@ -81,7 +83,7 @@ void ConfigureGlobalEvents(IGlobalEvents observers) }); } - var provider = this.GetServiceProvider(ConfigureGlobalEvents); + var provider = await this.GetServiceProviderAsync(ConfigureGlobalEvents); MessageStorage.Clear(); var producer = provider.GetRequiredService>(); @@ -95,9 +97,11 @@ void ConfigureGlobalEvents(IGlobalEvents observers) Assert.AreEqual(messageContext.Message.Key, message.Id.ToString()); } - private IServiceProvider GetServiceProvider(Action configureGlobalEvents) + private async Task GetServiceProviderAsync(Action configureGlobalEvents) { - var topicName = $"topic_{Guid.NewGuid()}"; + this.isPartitionAssigned = false; + + var topicName = $"GlobalEventsTestTopic_{Guid.NewGuid()}"; var builder = Host .CreateDefaultBuilder() @@ -136,7 +140,11 @@ private IServiceProvider GetServiceProvider(Action configureGloba .AddMiddlewares( middlewares => middlewares .AddSerializer() - .Add()))) + .Add()) + .WithPartitionsAssignedHandler((resolver, partitions) => + { + this.isPartitionAssigned = true; + }))) .SubscribeGlobalEvents(configureGlobalEvents))) .UseDefaultServiceProvider( (_, options) => @@ -149,10 +157,26 @@ private IServiceProvider GetServiceProvider(Action configureGloba var bus = host.Services.CreateKafkaBus(); bus.StartAsync().GetAwaiter().GetResult(); - // Wait partition assignment - Thread.Sleep(10000); + await this.WaitForPartitionAssignmentAsync(); return host.Services; } + + private async Task WaitForPartitionAssignmentAsync() + { + var retryPolicy = Policy + .Handle() + .WaitAndRetryAsync(Enumerable.Range(0, 6).Select(i => TimeSpan.FromSeconds(Math.Pow(i, 2)))); + + await retryPolicy.ExecuteAsync(() => + { + if (!this.isPartitionAssigned) + { + throw new Exception("Partition assignment hasn't occurred yet."); + } + + return Task.CompletedTask; + }); + } } } diff --git a/src/KafkaFlow.IntegrationTests/KafkaFlow.IntegrationTests.csproj b/src/KafkaFlow.IntegrationTests/KafkaFlow.IntegrationTests.csproj index aa696ebb8..94944dc09 100644 --- a/src/KafkaFlow.IntegrationTests/KafkaFlow.IntegrationTests.csproj +++ b/src/KafkaFlow.IntegrationTests/KafkaFlow.IntegrationTests.csproj @@ -26,6 +26,7 @@ runtime; build; native; contentfiles; analyzers; buildtransitive +