Skip to content

Commit

Permalink
tests: Add consumer error event tests
Browse files Browse the repository at this point in the history
  • Loading branch information
erik-catalao committed Oct 13, 2023
1 parent ba50742 commit 9ecd1ca
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using KafkaFlow.IntegrationTests.Core.Handlers;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace KafkaFlow.IntegrationTests.Core.Middlewares
{
internal class TriggerErrorMessageMiddleware : IMessageMiddleware
{
public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
{
MessageStorage.Add((byte[])context.Message.Value);
throw new Exception();

Check warning on line 15 in src/KafkaFlow.IntegrationTests/Core/Middlewares/TriggerErrorMessageMiddleware.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow.IntegrationTests/Core/Middlewares/TriggerErrorMessageMiddleware.cs#L15

'System.Exception' should not be thrown by user code.
await next(context);
}
}
}
129 changes: 102 additions & 27 deletions src/KafkaFlow.IntegrationTests/GlobalEventsTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,17 @@
public class GlobalEventsTest
{
private readonly Fixture fixture = new();
private string topic;
private bool isPartitionAssigned;

[TestInitialize]
public void Setup()
{
this.topic = $"GlobalEventsTestTopic_{Guid.NewGuid()}";

MessageStorage.Clear();
}

[TestMethod]
public async Task SubscribeGlobalEvents_AllEvents_TriggeredCorrectly()
{
Expand Down Expand Up @@ -51,7 +60,10 @@ void ConfigureGlobalEvents(IGlobalEvents observers)
});
}

var provider = await this.GetServiceProviderAsync(ConfigureGlobalEvents);
var provider = await this.GetServiceProviderAsync(
ConfigureGlobalEvents,
this.ConfigureConsumer<GzipMiddleware>,
this.ConfigureProducer<ProtobufNetSerializer>);
MessageStorage.Clear();

var producer = provider.GetRequiredService<IMessageProducer<JsonProducer2>>();
Expand Down Expand Up @@ -83,7 +95,11 @@ void ConfigureGlobalEvents(IGlobalEvents observers)
});
}

var provider = await this.GetServiceProviderAsync(ConfigureGlobalEvents);
var provider = await this.GetServiceProviderAsync(
ConfigureGlobalEvents,
this.ConfigureConsumer<GzipMiddleware>,
this.ConfigureProducer<ProtobufNetSerializer>);

MessageStorage.Clear();

var producer = provider.GetRequiredService<IMessageProducer<JsonProducer2>>();
Expand All @@ -97,11 +113,89 @@ void ConfigureGlobalEvents(IGlobalEvents observers)
Assert.AreEqual(messageContext.Message.Key, message.Id.ToString());
}

private async Task<IServiceProvider> GetServiceProviderAsync(Action<IGlobalEvents> configureGlobalEvents)
[TestMethod]
public async Task SubscribeGlobalEvents_ConsumerErrorEvent_TriggeredCorrectly()
{
this.isPartitionAssigned = false;
// Arrange
bool isMessageProducedStarted = false, isMessageConsumeStarted = false, isMessageConsumerError = false;

void ConfigureGlobalEvents(IGlobalEvents observers)
{
observers.MessageProduceStarted.Subscribe(eventContext =>
{
isMessageProducedStarted = true;
return Task.CompletedTask;
});

observers.MessageConsumeStarted.Subscribe(eventContext =>
{
isMessageConsumeStarted = true;
return Task.CompletedTask;
});

observers.MessageConsumeError.Subscribe(eventContext =>
{
isMessageConsumerError = true;
return Task.CompletedTask;
});
}

var topicName = $"GlobalEventsTestTopic_{Guid.NewGuid()}";
var provider = await this.GetServiceProviderAsync(
ConfigureGlobalEvents,
this.ConfigureConsumer<TriggerErrorMessageMiddleware>,
this.ConfigureProducer<ProtobufNetSerializer>);

MessageStorage.Clear();

var producer = provider.GetRequiredService<IMessageProducer<JsonProducer2>>();
var message = this.fixture.Create<byte[]>();

// Act
await producer.ProduceAsync(null, message);

await Task.Delay(10000);

await MessageStorage.AssertMessageAsync(message);

// Assert
Assert.IsTrue(isMessageProducedStarted);
Assert.IsTrue(isMessageConsumeStarted);
Assert.IsTrue(isMessageConsumerError);
}

private void ConfigureConsumer<T>(IConsumerConfigurationBuilder consumerConfigurationBuilder)
where T : class, IMessageMiddleware
{
consumerConfigurationBuilder
.Topic(this.topic)
.WithGroupId(this.topic)
.WithBufferSize(100)
.WithWorkersCount(10)
.WithAutoOffsetReset(AutoOffsetReset.Earliest)
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<ProtobufNetSerializer>()
.Add<T>())
.WithPartitionsAssignedHandler((resolver, partitions) =>

Check notice on line 179 in src/KafkaFlow.IntegrationTests/GlobalEventsTest.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow.IntegrationTests/GlobalEventsTest.cs#L179

'resolver' is not used. Use discard parameter instead.
{
this.isPartitionAssigned = true;
});
}

private void ConfigureProducer<T>(IProducerConfigurationBuilder producerConfigurationBuilder)
where T : class, ISerializer
{
producerConfigurationBuilder
.DefaultTopic(this.topic)
.AddMiddlewares(middlewares => middlewares.AddSerializer<T>());
}

private async Task<IServiceProvider> GetServiceProviderAsync(
Action<IGlobalEvents> configureGlobalEvents,
Action<IConsumerConfigurationBuilder> consumerConfiguration,
Action<IProducerConfigurationBuilder> producerConfiguration)
{
this.isPartitionAssigned = false;

var builder = Host
.CreateDefaultBuilder()
Expand All @@ -123,28 +217,9 @@ private async Task<IServiceProvider> GetServiceProviderAsync(Action<IGlobalEvent
.AddCluster(
cluster => cluster
.WithBrokers(context.Configuration.GetValue<string>("Kafka:Brokers").Split(';'))
.CreateTopicIfNotExists(topicName, 1, 1)
.AddProducer<JsonProducer2>(
producer => producer
.DefaultTopic(topicName)
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<ProtobufNetSerializer>()))
.AddConsumer(
consumer => consumer
.Topic(topicName)
.WithGroupId(topicName)
.WithBufferSize(100)
.WithWorkersCount(10)
.WithAutoOffsetReset(AutoOffsetReset.Earliest)
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<ProtobufNetSerializer>()
.Add<GzipMiddleware>())
.WithPartitionsAssignedHandler((resolver, partitions) =>
{
this.isPartitionAssigned = true;
})))
.CreateTopicIfNotExists(this.topic, 1, 1)
.AddProducer<JsonProducer2>(producerConfiguration)
.AddConsumer(consumerConfiguration))
.SubscribeGlobalEvents(configureGlobalEvents)))
.UseDefaultServiceProvider(
(_, options) =>
Expand Down

0 comments on commit 9ecd1ca

Please sign in to comment.