Skip to content

Commit

Permalink
Merge branch 'feat/include-open-telemetry' of https://github.com/Farf…
Browse files Browse the repository at this point in the history
…etch/kafkaflow into feat/include-open-telemetry
  • Loading branch information
simaoribeiro committed Oct 13, 2023
2 parents 9d46d65 + 1c8e4c2 commit ea13246
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 9 deletions.
42 changes: 33 additions & 9 deletions src/KafkaFlow.IntegrationTests/GlobalEventsTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
{
using System;
using System.IO;
using System.Threading;
using System.Linq;
using System.Threading.Tasks;
using AutoFixture;
using KafkaFlow.Configuration;
Expand All @@ -16,11 +16,13 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Polly;

[TestClass]
public class GlobalEventsTest
{
private readonly Fixture fixture = new();
private bool isPartitionAssigned;

[TestMethod]
public async Task SubscribeGlobalEvents_AllEvents_TriggeredCorrectly()
Expand Down Expand Up @@ -49,7 +51,7 @@ void ConfigureGlobalEvents(IGlobalEvents observers)
});
}

var provider = this.GetServiceProvider(ConfigureGlobalEvents);
var provider = await this.GetServiceProviderAsync(ConfigureGlobalEvents);
MessageStorage.Clear();

var producer = provider.GetRequiredService<IMessageProducer<JsonProducer2>>();
Expand All @@ -67,7 +69,7 @@ void ConfigureGlobalEvents(IGlobalEvents observers)
}

[TestMethod]
public void SubscribeGlobalEvents_MessageContext_IsAssignedCorrectly()
public async Task SubscribeGlobalEvents_MessageContext_IsAssignedCorrectly()
{
// Arrange
IMessageContext messageContext = null;
Expand All @@ -81,7 +83,7 @@ void ConfigureGlobalEvents(IGlobalEvents observers)
});
}

var provider = this.GetServiceProvider(ConfigureGlobalEvents);
var provider = await this.GetServiceProviderAsync(ConfigureGlobalEvents);
MessageStorage.Clear();

var producer = provider.GetRequiredService<IMessageProducer<JsonProducer2>>();
Expand All @@ -95,9 +97,11 @@ void ConfigureGlobalEvents(IGlobalEvents observers)
Assert.AreEqual(messageContext.Message.Key, message.Id.ToString());
}

private IServiceProvider GetServiceProvider(Action<IGlobalEvents> configureGlobalEvents)
private async Task<IServiceProvider> GetServiceProviderAsync(Action<IGlobalEvents> configureGlobalEvents)
{
var topicName = $"topic_{Guid.NewGuid()}";
this.isPartitionAssigned = false;

var topicName = $"GlobalEventsTestTopic_{Guid.NewGuid()}";

var builder = Host
.CreateDefaultBuilder()
Expand Down Expand Up @@ -136,7 +140,11 @@ private IServiceProvider GetServiceProvider(Action<IGlobalEvents> configureGloba
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<ProtobufNetSerializer>()
.Add<GzipMiddleware>())))
.Add<GzipMiddleware>())
.WithPartitionsAssignedHandler((resolver, partitions) =>
{
this.isPartitionAssigned = true;
})))
.SubscribeGlobalEvents(configureGlobalEvents)))
.UseDefaultServiceProvider(
(_, options) =>
Expand All @@ -149,10 +157,26 @@ private IServiceProvider GetServiceProvider(Action<IGlobalEvents> configureGloba
var bus = host.Services.CreateKafkaBus();
bus.StartAsync().GetAwaiter().GetResult();

// Wait partition assignment
Thread.Sleep(10000);
await this.WaitForPartitionAssignmentAsync();

return host.Services;
}

private async Task WaitForPartitionAssignmentAsync()
{
var retryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryAsync(Enumerable.Range(0, 6).Select(i => TimeSpan.FromSeconds(Math.Pow(i, 2))));

await retryPolicy.ExecuteAsync(() =>
{
if (!this.isPartitionAssigned)
{
throw new Exception("Partition assignment hasn't occurred yet.");
}
return Task.CompletedTask;
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="OpenTelemetry.Exporter.InMemory" Version="1.6.0" />
<PackageReference Include="Polly" Version="8.0.0" />
</ItemGroup>

<ItemGroup Condition="$([MSBuild]::IsOsPlatform('OSX'))">
Expand Down

0 comments on commit ea13246

Please sign in to comment.