Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get public repo up-to-date with latest improvements #20

Merged
merged 67 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from 60 commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
7defee2
Wait for consume task to complete when disposing EventuateKafkaConsumer
douggish Aug 25, 2021
6b6b376
Explicitly close the Kafka consumer
douggish Dec 2, 2021
8cf63a5
Pin cp-kafka and cp-zookeeper at version 7.0.0
douggish Sep 15, 2023
94ec87f
Stop the dispatchers when closing kafka message consumers
douggish Mar 11, 2022
7b96c6f
Handle case of new SwimlaneDispatcher after dispatchers are stopped
douggish Mar 14, 2022
b5660ef
Unsubscribe when Initializer hosted service is stopped
douggish Mar 14, 2022
385576e
Avoid unnecessary reference to AspNetCore library
douggish Sep 15, 2023
b004737
Fix potential deadlock issue in SwimlaneDispatcher
douggish Mar 15, 2022
d77a7ae
Fix line endings in shell scripts
thomaszylstra Mar 25, 2022
b129fe4
Add integration test for event processing during a shutdown
thomaszylstra Mar 28, 2022
ca18a7a
Introduce constant number of events in multi-event test
thomaszylstra Mar 29, 2022
95df5e5
Add DevSetup script
douggish Sep 18, 2023
a3296c6
Upgrade Confluent.Kafka dependency to 1.9.3
douggish Sep 15, 2023
2979ad2
Increase timeout for failing test
thomaszylstra Oct 3, 2022
53b2b2e
Delete records rather than topics during test cleanup
thomaszylstra Oct 3, 2022
ee02bd1
Increase timeout for publish multiple test
thomaszylstra Oct 4, 2022
18ba6e1
Pre-create topics for integration tests
douggish Sep 15, 2023
2f1164b
Remove unneeded configuration parameter
thomaszylstra Oct 5, 2022
0032c72
Update solution to .NET 6.0
douggish Sep 18, 2023
b950fef
Update nuget dependencies for .NET 6
douggish Sep 15, 2023
58f6a04
Fix failed unit tests
douggish Sep 15, 2023
590eecb
Update dependencies
thomaszylstra Oct 6, 2022
1575e50
Add partitions to kafka test setup
thomaszylstra Oct 6, 2022
083c8bc
Port java code to dotnet
thomaszylstra Nov 15, 2022
e073a0e
Initialize BackPressure configuration
thomaszylstra Nov 16, 2022
4caba87
Add integration test for backpressure performance
thomaszylstra Nov 17, 2022
9e6a5ef
Use ISet interface instead of explicit class
thomaszylstra Nov 18, 2022
3980156
Change accessibility of some fields
thomaszylstra Nov 18, 2022
3837977
Use poll timeout from configuration properties in KafkaConsumer
thomaszylstra Nov 18, 2022
7814702
Refactor backpressure test to include multiple message types
thomaszylstra Nov 22, 2022
fdd39a5
Add logging for backlog state
thomaszylstra Nov 22, 2022
5a1ba58
Add logging for dispatcher queue size
thomaszylstra Nov 22, 2022
7ad08ee
Fix issue with BackPressureManagerPausedState
douggish Nov 22, 2022
29b922f
Add unit tests for BackpressureManager state transitions
thomaszylstra Nov 22, 2022
df0c3a8
Remove extra logging messages
thomaszylstra Nov 22, 2022
b03824c
Add configuration property information to README
thomaszylstra Nov 22, 2022
1f51ad0
Use EquivalentTo for testing collection membership
thomaszylstra Nov 22, 2022
f880337
Add default property values to README
thomaszylstra Nov 22, 2022
1b8df34
Use immutable collections for BackPressureActions
thomaszylstra Nov 22, 2022
67e9f2e
Support async message handling
douggish Feb 24, 2023
c961930
Don't get scoped service from root service provider in tests
douggish Sep 18, 2023
e772ae0
Setup transaction scope properly for async flow
douggish Mar 22, 2023
40db56c
Clean up some todos
douggish Mar 25, 2023
c576d8e
Add support for async domainEventHandlersFactory
douggish Mar 25, 2023
fea5256
Try to implement StopAsync for DomainEventDispatcher
douggish May 19, 2023
4d2d3d8
Pass through cancellation token
douggish May 19, 2023
a558f72
Change SwimlaneBasedDispatcher Dispatch method to async
douggish May 30, 2023
aeec497
Implement IAsyncDisposable in KafkaMessageConsumer
douggish May 30, 2023
f0161db
Implement IAsyncDisposable in EventuateKafkaConsumer
douggish May 31, 2023
8eb4bbb
Update dependencies
douggish May 31, 2023
8e750a2
Make sure we dispose Kafka consumer
douggish Jun 1, 2023
4856680
Log different error if message handler throws canceled exception
douggish Jun 1, 2023
1c352ff
Test handler canceled
douggish Jun 1, 2023
b89622c
Remove unused field
douggish Jun 2, 2023
59dc229
Add some locking in KafkaMessageConsumer
douggish Jun 2, 2023
5623ec8
Stop DomainEventDispatchers in series
douggish Jun 5, 2023
e3ec1d7
Add partition rebalancing test
douggish Sep 12, 2023
890eef0
Update readme examples to align with async handling changes
douggish Sep 20, 2023
5c3a276
Upgrade github action versions
douggish Sep 20, 2023
1aa0f90
Add a changelog
douggish Sep 20, 2023
4783732
Add changelog file to the "Solution Items"
douggish Sep 20, 2023
dc95113
Add async suffix to test cleanup method name
douggish Sep 20, 2023
bd75999
Add some documentation to QueuedMessage
douggish Sep 20, 2023
a4f6694
Rename some fields to improve clarity
douggish Sep 20, 2023
6f73b93
Change default BackPressure PauseThreshold to uint.MaxValue
douggish Sep 20, 2023
c6c81af
Rename file to match interface name
douggish Sep 20, 2023
12ed210
Clarify readme regarding Kafka consumer configuration
douggish Sep 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitattributes
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# Auto detect text files and perform LF normalization
* text=auto
* text=auto
*.sh text eol=lf
14 changes: 7 additions & 7 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ jobs:

steps:
- name: Checkout source
uses: actions/checkout@v2
uses: actions/checkout@v4

- name: Setup .NET Core
uses: actions/setup-dotnet@v1
uses: actions/setup-dotnet@v3
with:
dotnet-version: 3.1.201
dotnet-version: 6.0.414

- name: Build
env:
Expand All @@ -29,7 +29,7 @@ jobs:
run: ./build.sh

- name: Upload bin folder
uses: actions/upload-artifact@v1
uses: actions/upload-artifact@v3
with:
name: bin
path: IO.Eventuate.Tram/bin
Expand All @@ -38,7 +38,7 @@ jobs:
run: dotnet test -c Release --no-build --verbosity normal --logger trx IO.Eventuate.Tram.UnitTests/

- name: Upload unit test results
uses: actions/upload-artifact@v1
uses: actions/upload-artifact@v3
if: always()
with:
name: unit-test-results
Expand All @@ -57,11 +57,11 @@ jobs:
docker stats --no-stream --all

- name: Upload integration test results
uses: actions/upload-artifact@v1
uses: actions/upload-artifact@v3
if: always()
with:
name: integration-test-results
path: IO.Eventuate.Tram.IntegrationTests/bin/Release/netcoreapp3.1/TestResults
path: IO.Eventuate.Tram.IntegrationTests/bin/Release/net6.0/TestResults

- name: Publish nuget package
# Don't publish nuget packages for builds triggered by pull requests (pull requests from forks won't have access to secrets anyway)
Expand Down
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Changelog

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added
- Initial port of Eventuate Tram in .NET supporting the following functionality:
- messaging - send and receive messages over named channels
- events - publish domain events and subscribe to domain events

[Unreleased]: https://github.com/eventuate-tram/eventuate-tram-core-dotnet/commits/HEAD
18 changes: 18 additions & 0 deletions DevSetup.ps1
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
Set-Item -Path Env:CDC_SERVICE_DOCKER_VERSION -Value ("0.6.0.RELEASE")
Push-Location -Path IO.Eventuate.Tram.IntegrationTests

docker compose down --remove-orphans
docker compose up -d mssql
docker compose up -d zookeeper
docker compose up -d kafka

Start-Sleep -Seconds 40
docker stats --no-stream

docker-compose up --exit-code-from kafka-setup kafka-setup
docker compose up --exit-code-from dbsetup dbsetup
docker compose up -d cdcservice

docker compose up -d kafka-ui

Pop-Location
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFramework>net6.0</TargetFramework>

<IsPackable>false</IsPackable>

Expand All @@ -23,13 +23,13 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="3.1.3" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="3.1.3" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="3.1.3" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.6.0" />
<PackageReference Include="Confluent.Kafka" Version="1.4.0" />
<PackageReference Include="NUnit" Version="3.12.0" />
<PackageReference Include="NUnit3TestAdapter" Version="3.16.1" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="6.0.16" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.6.0" />
<PackageReference Include="Confluent.Kafka" Version="2.1.1" />
<PackageReference Include="NUnit" Version="3.13.3" />
<PackageReference Include="NUnit3TestAdapter" Version="4.5.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using IO.Eventuate.Tram.Events.Common;
using IO.Eventuate.Tram.IntegrationTests.TestHelpers;
using IO.Eventuate.Tram.Local.Kafka.Consumer;
using NUnit.Framework;

namespace IO.Eventuate.Tram.IntegrationTests.TestFixtures;

public class BackpressureTests : IntegrationTestsBase
{
private const uint PauseThreshold = 20;
private const uint ResumeThreshold = 5;

[SetUp]
public async Task Setup()
{
await CleanupKafkaTopics();
// Initialize the backpressure properties
var properties = new EventuateKafkaConsumerConfigurationProperties
{
BackPressure =
{
PauseThreshold = PauseThreshold,
ResumeThreshold = ResumeThreshold
}
};
TestSetup("eventuate", false, properties);
await CleanupTestAsync();
}

[TearDown]
public void TearDown()
{
DisposeTestHost();
}

[Test]
public void PublishWithBackpressure_Send1000Messages_AllMessagesEventuallyProcessed()
{
// Arrange
uint messagesPerType = 250;
uint totalMessages = messagesPerType * 4;
TestMessageType1 msg1 = new TestMessageType1("Msg1", 1, 1.2);
TestMessageType2 msg2 = new TestMessageType2("Msg2", 2);
TestMessageType3 msg3 = new TestMessageType3("Msg3", 3);
TestMessageType4 msg4 = new TestMessageType4("Msg4", 4);

for (int x = 0; x < messagesPerType; x++)
{
GetTestPublisher().Publish(AggregateType12, AggregateType12, new List<IDomainEvent> { msg1 });
GetTestPublisher().Publish(AggregateType12, AggregateType12, new List<IDomainEvent> { msg2 });
GetTestPublisher().Publish(AggregateType34, AggregateType34, new List<IDomainEvent> { msg3 });
GetTestPublisher().Publish(AggregateType34, AggregateType34, new List<IDomainEvent> { msg4 });
}

// Act
TestEventConsumer consumer = GetTestConsumer();

// Allow time for messages to process
int count = 300;
while (consumer.TotalMessageCount() < totalMessages && count > 0)
{
TestContext.WriteLine($"TotalMessageCount: {consumer.TotalMessageCount()} ({count})");
Thread.Sleep(1000);
count--;
}
ShowTestResults();

// Assert
Assert.That(GetDbContext().Messages.Count(),
Is.EqualTo(totalMessages), "Number of messages produced");
Assert.That(GetDbContext().Messages.Count(msg => msg.Published == 0),
Is.EqualTo(0), "Number of unpublished messages");
foreach (var eventType in new[] { typeof(TestMessageType1), typeof(TestMessageType2), typeof(TestMessageType3), typeof(TestMessageType4) })
{
TestEventConsumer.EventStatistics eventStatistics = consumer.GetEventStatistics(eventType);
Assert.That(eventStatistics.MessageCount,
Is.EqualTo(messagesPerType), $"Number of {eventType.Name} messages received by consumer");
Assert.That(eventStatistics.ReceivedMessages.Count,
Is.EqualTo(messagesPerType), $"Number of received {eventType.Name} messages");
}

Assert.That(consumer.TotalMessageCount(),
Is.EqualTo(totalMessages), "Total number of messages received by consumer");
Assert.That(GetDbContext().ReceivedMessages.Count(msg => msg.MessageId != null),
Is.EqualTo(totalMessages), "Number of received messages");
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using IO.Eventuate.Tram.Database;
using IO.Eventuate.Tram.Events.Publisher;
using IO.Eventuate.Tram.IntegrationTests.TestHelpers;
using IO.Eventuate.Tram.Local.Kafka.Consumer;
using IO.Eventuate.Tram.Messaging.Common;
using IO.Eventuate.Tram.Messaging.Consumer;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
Expand All @@ -24,11 +23,16 @@ public class IntegrationTestsBase
private string _subscriberId = "xx";
protected const string AggregateType12 = "TestMessage12Topic";
protected const string AggregateType34 = "TestMessage34Topic";
protected const string AggregateTypeDelay = "TestMessageDelayTopic";
protected const string TestPartitionAssignmentTopic1 = "TestPartitionAssignmentTopic1";
protected const string TestPartitionAssignmentTopic2 = "TestPartitionAssignmentTopic2";
protected string EventuateDatabaseSchemaName = "eventuate";
public static string PingFileName = "ping.txt";

protected TestSettings TestSettings;

private static IHost _host;
private static IServiceScope _testServiceScope;
private static EventuateTramDbContext _dbContext;
private static IDomainEventPublisher _domainEventPublisher;
private static TestEventConsumer _testEventConsumer;
Expand Down Expand Up @@ -57,45 +61,46 @@ protected void TestSetup(string schema, bool withInterceptor, EventuateKafkaCons
{
EventuateDatabaseSchemaName = schema;
_subscriberId = Guid.NewGuid().ToString();

// Clear the ping file
File.WriteAllText(PingFileName, string.Empty);

if (_host == null)
{
_host = SetupTestHost(withInterceptor, consumerConfigProperties);
_dbContext = _host.Services.GetService<EventuateTramDbContext>();
_domainEventPublisher = _host.Services.GetService<IDomainEventPublisher>();
_testEventConsumer = _host.Services.GetService<TestEventConsumer>();
_interceptor = (TestMessageInterceptor)_host.Services.GetService<IMessageInterceptor>();
var scopeFactory = _host.Services.GetRequiredService<IServiceScopeFactory>();
_testServiceScope = scopeFactory.CreateScope();
_dbContext = _testServiceScope.ServiceProvider.GetRequiredService<EventuateTramDbContext>();
_domainEventPublisher = _testServiceScope.ServiceProvider.GetRequiredService<IDomainEventPublisher>();
_testEventConsumer = _testServiceScope.ServiceProvider.GetRequiredService<TestEventConsumer>();
_interceptor = (TestMessageInterceptor)_testServiceScope.ServiceProvider.GetService<IMessageInterceptor>();
}
}

protected void CleanupTest()
protected async Task CleanupTestAsync()
{
ClearDb(GetDbContext(), EventuateDatabaseSchemaName);
await ClearDbAsync(GetDbContext(), EventuateDatabaseSchemaName);
GetTestConsumer().Reset();
GetTestMessageInterceptor()?.Reset();
}

protected async Task CleanupKafka()
protected async Task CleanupKafkaTopics()
{
var config = new AdminClientConfig();
config.BootstrapServers = TestSettings.KafkaBootstrapServers;
try
{
using (var admin = new AdminClientBuilder(config).Build())
{
await admin.DeleteTopicsAsync(new[] {AggregateType12, AggregateType34});
}
}
catch (DeleteTopicsException e)
using var admin = new AdminClientBuilder(config).Build();
Metadata kafkaMetadata = admin.GetMetadata(TimeSpan.FromSeconds(10));
foreach (var topic in new[] {AggregateType12, AggregateType34, AggregateTypeDelay, TestPartitionAssignmentTopic1, TestPartitionAssignmentTopic2})
{
// Don't fail if topic wasn't found (nothing to delete)
if (e.Results.Where(r => r.Error.IsError).All(r => r.Error.Code == ErrorCode.UnknownTopicOrPart))
TopicMetadata paMessagesMetadata = kafkaMetadata.Topics.Find(t => t.Topic.Equals(topic));
if (paMessagesMetadata != null)
{
TestContext.WriteLine(e.Message);
await admin.DeleteRecordsAsync(paMessagesMetadata.Partitions.Select(p => new TopicPartitionOffset(new TopicPartition(
topic, p.PartitionId), Offset.End)));
}
else
{
throw;
TestContext.WriteLine($"Topic {topic} did not exist.");
}
}
}
Expand All @@ -109,6 +114,7 @@ protected void ShowTestResults()
TestContext.WriteLine(" Dispatcher Id: {0}", _subscriberId);
TestContext.WriteLine(" Aggregate Type 12: {0}", AggregateType12);
TestContext.WriteLine(" Aggregate Type 34: {0}", AggregateType34);
TestContext.WriteLine(" Aggregate Type Delay: {0}", AggregateTypeDelay);

TestContext.WriteLine("Test Results");
TestContext.WriteLine(" N Messages in DB: {0}", _dbContext.Messages.Count());
Expand Down Expand Up @@ -166,7 +172,7 @@ protected IHost SetupTestHost(bool withInterceptor, EventuateKafkaConsumerConfig
provider =>
{
var consumer = provider.GetRequiredService<TestEventConsumer>();
return consumer.DomainEventHandlers(AggregateType12, AggregateType34);
return consumer.DomainEventHandlers(AggregateType12, AggregateType34, AggregateTypeDelay);
})
.SetConsumerConfigProperties(consumerConfigProperties)
.Build<TestEventConsumer>(withInterceptor);
Expand All @@ -179,8 +185,7 @@ protected void DisposeTestHost()
if (_host == null)
return;

var messageConsumer = _host.Services.GetService<IMessageConsumer>();
messageConsumer.Close();
_testServiceScope.Dispose();
_host.StopAsync().Wait();
_host.Dispose();
_host = null;
Expand Down Expand Up @@ -209,10 +214,10 @@ protected EventuateTramDbContext GetDbContext()
return _dbContext;
}

protected void ClearDb(EventuateTramDbContext dbContext, String eventuateDatabaseSchemaName)
protected static async Task ClearDbAsync(EventuateTramDbContext dbContext, String eventuateDatabaseSchemaName)
{
dbContext.Database.ExecuteSqlRaw(String.Format("Delete from [{0}].[message]", eventuateDatabaseSchemaName));
dbContext.Database.ExecuteSqlRaw(String.Format("Delete from [{0}].[received_messages]", eventuateDatabaseSchemaName));
await dbContext.Database.ExecuteSqlRawAsync($"Delete from [{eventuateDatabaseSchemaName}].[message]");
await dbContext.Database.ExecuteSqlRawAsync($"Delete from [{eventuateDatabaseSchemaName}].[received_messages]");
}
}
}
Loading