From 433f337312930c671d10a44a62faad29b3779918 Mon Sep 17 00:00:00 2001 From: Joel Oliveira Date: Wed, 12 Jun 2024 18:08:33 +0100 Subject: [PATCH] feat: allow multiple OnStarted and OnStopping callback --- .../ClusterConfigurationBuilder.cs | 4 +- .../GlobalEventsTest.cs | 75 ++++++++++++++++--- 2 files changed, 68 insertions(+), 11 deletions(-) diff --git a/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs index cd85129ef..4867262cf 100644 --- a/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs @@ -98,13 +98,13 @@ public IClusterConfigurationBuilder AddConsumer(Action handler) { - _onStoppingHandler = handler; + _onStoppingHandler += handler; return this; } public IClusterConfigurationBuilder OnStarted(Action handler) { - _onStartedHandler = handler; + _onStartedHandler += handler; return this; } diff --git a/tests/KafkaFlow.IntegrationTests/GlobalEventsTest.cs b/tests/KafkaFlow.IntegrationTests/GlobalEventsTest.cs index 6a87ac8a5..366a456cb 100644 --- a/tests/KafkaFlow.IntegrationTests/GlobalEventsTest.cs +++ b/tests/KafkaFlow.IntegrationTests/GlobalEventsTest.cs @@ -26,6 +26,7 @@ public class GlobalEventsTest private readonly Fixture _fixture = new(); private string _topic; private bool _isPartitionAssigned; + private IKafkaBus _bus; [TestInitialize] public void Setup() @@ -35,6 +36,52 @@ public void Setup() MessageStorage.Clear(); } + [TestMethod] + public async Task OnStarted_RegisterMultipleOnStartedCallbacks_AllAreCalled() + { + // Arrange + const int ExpectedOnStartedCount = 2; + var countOnStarted = 0; + + // Act + await this.GetServiceProviderAsync( + observers => { }, + this.ConfigureConsumer, + this.ConfigureProducer, + cluster => + { + cluster.OnStarted(_ => countOnStarted++); + cluster.OnStarted(_ => countOnStarted++); + }); + + // Assert + Assert.AreEqual(ExpectedOnStartedCount, countOnStarted); + } + + [TestMethod] + public async Task OnStopping_RegisterMultipleOnStoppingCallbacks_AllAreCalled() + { + // Arrange + const int ExpectedOnStoppingCount = 2; + var countOnStopping = 0; + + // Act + await this.GetServiceProviderAsync( + observers => { }, + this.ConfigureConsumer, + this.ConfigureProducer, + cluster => + { + cluster.OnStopping(_ => countOnStopping++); + cluster.OnStopping(_ => countOnStopping++); + }); + + await _bus?.StopAsync(); + + // Assert + Assert.AreEqual(ExpectedOnStoppingCount, countOnStopping); + } + [TestMethod] public async Task SubscribeGlobalEvents_AllEvents_TriggeredCorrectly() { @@ -241,10 +288,25 @@ private void ConfigureProducer(IProducerConfigurationBuilder producerConfigur private async Task GetServiceProviderAsync( Action configureGlobalEvents, Action consumerConfiguration, - Action producerConfiguration) + Action producerConfiguration, + Action builderConfiguration = null) { _isPartitionAssigned = false; + var clusterBuilderAction = (HostBuilderContext context, IClusterConfigurationBuilder cluster) => + { + cluster + .WithBrokers(context.Configuration.GetValue("Kafka:Brokers").Split(';')) + .CreateTopicIfNotExists(_topic, 1, 1) + .AddProducer(producerConfiguration) + .AddConsumer(consumerConfiguration); + }; + + clusterBuilderAction += (_, cluster) => + { + builderConfiguration?.Invoke(cluster); + }; + var builder = Host .CreateDefaultBuilder() .ConfigureAppConfiguration( @@ -262,12 +324,7 @@ private async Task GetServiceProviderAsync( services.AddKafka( kafka => kafka .UseLogHandler() - .AddCluster( - cluster => cluster - .WithBrokers(context.Configuration.GetValue("Kafka:Brokers").Split(';')) - .CreateTopicIfNotExists(_topic, 1, 1) - .AddProducer(producerConfiguration) - .AddConsumer(consumerConfiguration)) + .AddCluster(cluster => { clusterBuilderAction.Invoke(context, cluster); }) .SubscribeGlobalEvents(configureGlobalEvents))) .UseDefaultServiceProvider( (_, options) => @@ -277,8 +334,8 @@ private async Task GetServiceProviderAsync( }); var host = builder.Build(); - var bus = host.Services.CreateKafkaBus(); - bus.StartAsync().GetAwaiter().GetResult(); + _bus = host.Services.CreateKafkaBus(); + _bus.StartAsync().GetAwaiter().GetResult(); await this.WaitForPartitionAssignmentAsync();