From 09f5b2b3849773c528de81c89fc27f028717591a Mon Sep 17 00:00:00 2001 From: Doug Gish Date: Thu, 2 May 2024 13:54:39 -0600 Subject: [PATCH] Upgrade to .NET 8 - Changed project targets to net8.0 - Upgraded package dependencies to the latest released versions - Changed from spaces to tab indentation in some files --- .github/workflows/build.yml | 4 +- .../IO.Eventuate.Tram.IntegrationTests.csproj | 14 +- .../TestFixtures/BadSchemaIntegrationTests.cs | 54 +- .../TestFixtures/IntegrationTestsBase.cs | 421 ++++----- .../TestFixtures/PerformanceTests.cs | 96 +- .../ProducerConsumerIntegrationTests.cs | 827 +++++++++--------- ...amicEventuateSchemaModelCacheKeyFactory.cs | 2 +- .../TestHelpers/TestEventConsumer.cs | 282 +++--- .../TestHelpers/TestHostBuilder.cs | 151 ++-- .../TestHelpers/TestMessageInterceptor.cs | 139 +-- .../TestHelpers/TestMessageType1.cs | 42 +- .../TestHelpers/TestMessageType2.cs | 43 +- .../TestHelpers/TestMessageType3.cs | 22 +- .../TestHelpers/TestMessageType4.cs | 22 +- .../TestHelpers/TestMessageTypeDelay.cs | 22 +- .../TestMessageUnsubscribedType.cs | 19 +- .../TestSettings.cs | 21 +- .../docker-compose.yml | 6 +- .../testsettings.json | 2 +- .../Subscriber/DomainEventDispatcherTests.cs | 97 +- .../IO.Eventuate.Tram.UnitTests.csproj | 8 +- .../Producer/AbstractMessageProducerTests.cs | 66 +- .../Producer/HttpDateHeaderFormatUtilTests.cs | 18 +- .../Messaging/Producer/MessageBuilderTests.cs | 6 +- IO.Eventuate.Tram/IO.Eventuate.Tram.csproj | 10 +- README.md | 2 +- 26 files changed, 1215 insertions(+), 1181 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 8e2aa82..f753c98 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -19,7 +19,7 @@ jobs: - name: Setup .NET Core uses: actions/setup-dotnet@v3 with: - dotnet-version: 6.0.414 + dotnet-version: 8.0.204 - name: Build env: @@ -61,7 +61,7 @@ jobs: if: always() with: name: integration-test-results - path: IO.Eventuate.Tram.IntegrationTests/bin/Release/net6.0/TestResults + path: IO.Eventuate.Tram.IntegrationTests/bin/Release/net8.0/TestResults - name: Publish nuget package # Don't publish nuget packages for builds triggered by pull requests (pull requests from forks won't have access to secrets anyway) diff --git a/IO.Eventuate.Tram.IntegrationTests/IO.Eventuate.Tram.IntegrationTests.csproj b/IO.Eventuate.Tram.IntegrationTests/IO.Eventuate.Tram.IntegrationTests.csproj index 417b864..5e53f82 100644 --- a/IO.Eventuate.Tram.IntegrationTests/IO.Eventuate.Tram.IntegrationTests.csproj +++ b/IO.Eventuate.Tram.IntegrationTests/IO.Eventuate.Tram.IntegrationTests.csproj @@ -1,7 +1,7 @@ - net6.0 + net8.0 false @@ -23,12 +23,12 @@ - - - - - - + + + + + + diff --git a/IO.Eventuate.Tram.IntegrationTests/TestFixtures/BadSchemaIntegrationTests.cs b/IO.Eventuate.Tram.IntegrationTests/TestFixtures/BadSchemaIntegrationTests.cs index 28764e1..b59eaf9 100644 --- a/IO.Eventuate.Tram.IntegrationTests/TestFixtures/BadSchemaIntegrationTests.cs +++ b/IO.Eventuate.Tram.IntegrationTests/TestFixtures/BadSchemaIntegrationTests.cs @@ -7,33 +7,33 @@ namespace IO.Eventuate.Tram.IntegrationTests.TestFixtures { - [TestFixture] - public class BadSchemaIntegrationTests : IntegrationTestsBase - { - [SetUp] - public void Setup() - { - TestSetup("badschema", false, EventuateKafkaConsumerConfigurationProperties.Empty()); - } + [TestFixture] + public class BadSchemaIntegrationTests : IntegrationTestsBase + { + [SetUp] + public void Setup() + { + TestSetup("badschema", false, EventuateKafkaConsumerConfigurationProperties.Empty()); + } - [TearDown] - public void TearDown() - { - DisposeTestHost(); - } + [TearDown] + public void TearDown() + { + DisposeTestHost(); + } - [Test] - public void Publish_DatabaseSchemaNotCreated_ThrowsException() - { - // Arrange - TestMessageType1 msg1 = new TestMessageType1("Msg1", 1, 1.2); - TestEventConsumer consumer = GetTestConsumer(); + [Test] + public void Publish_DatabaseSchemaNotCreated_ThrowsException() + { + // Arrange + TestMessageType1 msg1 = new TestMessageType1("Msg1", 1, 1.2); + TestEventConsumer consumer = GetTestConsumer(); - // Act - Assert.Throws(delegate () - { - GetTestPublisher().Publish(AggregateType12, AggregateType12, new List { msg1 }); - }); - } - } -} + // Act + Assert.Throws(delegate() + { + GetTestPublisher().Publish(AggregateType12, AggregateType12, new List { msg1 }); + }); + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram.IntegrationTests/TestFixtures/IntegrationTestsBase.cs b/IO.Eventuate.Tram.IntegrationTests/TestFixtures/IntegrationTestsBase.cs index 3fee31f..e0bdf59 100644 --- a/IO.Eventuate.Tram.IntegrationTests/TestFixtures/IntegrationTestsBase.cs +++ b/IO.Eventuate.Tram.IntegrationTests/TestFixtures/IntegrationTestsBase.cs @@ -17,207 +17,220 @@ namespace IO.Eventuate.Tram.IntegrationTests.TestFixtures { - public class IntegrationTestsBase - { - private const string TestSettingsFile = "testsettings.json"; - private string _subscriberId = "xx"; - protected const string AggregateType12 = "TestMessage12Topic"; - protected const string AggregateType34 = "TestMessage34Topic"; - protected const string AggregateTypeDelay = "TestMessageDelayTopic"; - protected const string TestPartitionAssignmentTopic1 = "TestPartitionAssignmentTopic1"; - protected const string TestPartitionAssignmentTopic2 = "TestPartitionAssignmentTopic2"; - protected string EventuateDatabaseSchemaName = "eventuate"; - public static string PingFileName = "ping.txt"; - - protected TestSettings TestSettings; - - private static IHost _host; - private static IServiceScope _testServiceScope; - private static EventuateTramDbContext _dbContext; - private static IDomainEventPublisher _domainEventPublisher; - private static TestEventConsumer _testEventConsumer; - private static TestMessageInterceptor _interceptor; - - public IntegrationTestsBase() - { - IConfigurationRoot configuration; - try - { - IConfigurationBuilder configurationBuilder = new ConfigurationBuilder(); - ConfigureFromEnvironmentAndSettingsFile(configurationBuilder); - configuration = configurationBuilder.Build(); - } - catch - { - IConfigurationBuilder configurationBuilder = new ConfigurationBuilder(); - ConfigureFromEnvironment(configurationBuilder); - configuration = configurationBuilder.Build(); - } - - TestSettings = configuration.Get(); - } - - protected void TestSetup(string schema, bool withInterceptor, EventuateKafkaConsumerConfigurationProperties consumerConfigProperties) - { - EventuateDatabaseSchemaName = schema; - _subscriberId = Guid.NewGuid().ToString(); - - // Clear the ping file - File.WriteAllText(PingFileName, string.Empty); - - if (_host == null) - { - _host = SetupTestHost(withInterceptor, consumerConfigProperties); - var scopeFactory = _host.Services.GetRequiredService(); - _testServiceScope = scopeFactory.CreateScope(); - _dbContext = _testServiceScope.ServiceProvider.GetRequiredService(); - _domainEventPublisher = _testServiceScope.ServiceProvider.GetRequiredService(); - _testEventConsumer = _testServiceScope.ServiceProvider.GetRequiredService(); - _interceptor = (TestMessageInterceptor)_testServiceScope.ServiceProvider.GetService(); - } - } - - protected async Task CleanupTestAsync() - { - await ClearDbAsync(GetDbContext(), EventuateDatabaseSchemaName); - GetTestConsumer().Reset(); - GetTestMessageInterceptor()?.Reset(); - } - - protected async Task CleanupKafkaTopicsAsync() - { - var config = new AdminClientConfig(); - config.BootstrapServers = TestSettings.KafkaBootstrapServers; - using var admin = new AdminClientBuilder(config).Build(); - Metadata kafkaMetadata = admin.GetMetadata(TimeSpan.FromSeconds(10)); - foreach (var topic in new[] {AggregateType12, AggregateType34, AggregateTypeDelay, TestPartitionAssignmentTopic1, TestPartitionAssignmentTopic2}) - { - TopicMetadata paMessagesMetadata = kafkaMetadata.Topics.Find(t => t.Topic.Equals(topic)); - if (paMessagesMetadata != null) - { - await admin.DeleteRecordsAsync(paMessagesMetadata.Partitions.Select(p => new TopicPartitionOffset(new TopicPartition( - topic, p.PartitionId), Offset.End))); - } - else - { - TestContext.WriteLine($"Topic {topic} did not exist."); - } - } - } - - protected void ShowTestResults() - { - TestContext.WriteLine("Test Config"); - TestContext.WriteLine(" Connection String: {0}", TestSettings.ConnectionStrings.EventuateTramDbConnection); - TestContext.WriteLine(" Kafka server: {0}", TestSettings.KafkaBootstrapServers); - TestContext.WriteLine(" Schema: {0}", EventuateDatabaseSchemaName); - TestContext.WriteLine(" Dispatcher Id: {0}", _subscriberId); - TestContext.WriteLine(" Aggregate Type 12: {0}", AggregateType12); - TestContext.WriteLine(" Aggregate Type 34: {0}", AggregateType34); - TestContext.WriteLine(" Aggregate Type Delay: {0}", AggregateTypeDelay); - - TestContext.WriteLine("Test Results"); - TestContext.WriteLine(" N Messages in DB: {0}", _dbContext.Messages.Count()); - TestContext.WriteLine(" Unpublished Count: {0}", _dbContext.Messages.Count(msg => msg.Published == 0)); - TestContext.WriteLine(" N Received in DB: {0}", _dbContext.ReceivedMessages.Count(msg => msg.MessageId != null)); - foreach (Type eventType in _testEventConsumer.GetEventTypes()) - { - TestContext.WriteLine($" Received {eventType.Name} {_testEventConsumer.GetEventStatistics(eventType).MessageCount}"); - } - TestContext.WriteLine(" Exception Count: {0}", _testEventConsumer.ExceptionCount); - - if (_interceptor != null) - { - TestContext.WriteLine("Message Interceptor Counts"); - TestContext.WriteLine(" Pre Send: {0}", _interceptor.PreSendCount); - TestContext.WriteLine(" Post Send: {0}", _interceptor.PostSendCount); - TestContext.WriteLine(" Pre Receive: {0}", _interceptor.PreReceiveCount); - TestContext.WriteLine(" Post Receive: {0}", _interceptor.PostReceiveCount); - TestContext.WriteLine(" Pre Handle: {0}", _interceptor.PreHandleCount); - TestContext.WriteLine(" Post Handle: {0}", _interceptor.PostHandleCount); - } - } - - /// - /// Set up the configuration for the HostBuilder - /// - protected void ConfigureFromEnvironmentAndSettingsFile(IConfigurationBuilder config, - Dictionary overrides = null) - { - config - .AddJsonFile(TestSettingsFile, false) - .AddEnvironmentVariables() - .AddInMemoryCollection(overrides); - } - - /// - /// Set up the configuration for the HostBuilder - /// - protected void ConfigureFromEnvironment(IConfigurationBuilder config, - Dictionary overrides = null) - { - config - .AddEnvironmentVariables() - .AddInMemoryCollection(overrides); - } - - protected IHost SetupTestHost(bool withInterceptor, EventuateKafkaConsumerConfigurationProperties consumerConfigProperties) - { - var host = new TestHostBuilder() - .SetConnectionString(TestSettings.ConnectionStrings.EventuateTramDbConnection) - .SetEventuateDatabaseSchemaName(EventuateDatabaseSchemaName) - .SetKafkaBootstrapServers(TestSettings.KafkaBootstrapServers) - .SetSubscriberId(_subscriberId) - .SetDomainEventHandlersFactory( - provider => - { - var consumer = provider.GetRequiredService(); - return consumer.DomainEventHandlers(AggregateType12, AggregateType34, AggregateTypeDelay); - }) - .SetConsumerConfigProperties(consumerConfigProperties) - .Build(withInterceptor); - host.StartAsync().Wait(); - return host; - } - - protected void DisposeTestHost() - { - if (_host == null) - return; - - _testServiceScope.Dispose(); - _host.StopAsync().Wait(); - _host.Dispose(); - _host = null; - _dbContext = null; - _domainEventPublisher = null; - _testEventConsumer = null; - } - - protected TestEventConsumer GetTestConsumer() - { - return _testEventConsumer; - } - - protected TestMessageInterceptor GetTestMessageInterceptor() - { - return _interceptor; - } - - protected IDomainEventPublisher GetTestPublisher() - { - return _domainEventPublisher; - } - - protected EventuateTramDbContext GetDbContext() - { - return _dbContext; - } - - protected static async Task ClearDbAsync(EventuateTramDbContext dbContext, String eventuateDatabaseSchemaName) - { - await dbContext.Database.ExecuteSqlRawAsync($"Delete from [{eventuateDatabaseSchemaName}].[message]"); - await dbContext.Database.ExecuteSqlRawAsync($"Delete from [{eventuateDatabaseSchemaName}].[received_messages]"); - } - } -} + public class IntegrationTestsBase + { + private const string TestSettingsFile = "testsettings.json"; + private string _subscriberId = "xx"; + protected const string AggregateType12 = "TestMessage12Topic"; + protected const string AggregateType34 = "TestMessage34Topic"; + protected const string AggregateTypeDelay = "TestMessageDelayTopic"; + protected const string TestPartitionAssignmentTopic1 = "TestPartitionAssignmentTopic1"; + protected const string TestPartitionAssignmentTopic2 = "TestPartitionAssignmentTopic2"; + protected string EventuateDatabaseSchemaName = "eventuate"; + public static string PingFileName = "ping.txt"; + + protected TestSettings TestSettings; + + private static IHost _host; + private static IServiceScope _testServiceScope; + private static EventuateTramDbContext _dbContext; + private static IDomainEventPublisher _domainEventPublisher; + private static TestEventConsumer _testEventConsumer; + private static TestMessageInterceptor _interceptor; + + public IntegrationTestsBase() + { + IConfigurationRoot configuration; + try + { + IConfigurationBuilder configurationBuilder = new ConfigurationBuilder(); + ConfigureFromEnvironmentAndSettingsFile(configurationBuilder); + configuration = configurationBuilder.Build(); + } + catch + { + IConfigurationBuilder configurationBuilder = new ConfigurationBuilder(); + ConfigureFromEnvironment(configurationBuilder); + configuration = configurationBuilder.Build(); + } + + TestSettings = configuration.Get(); + } + + protected void TestSetup(string schema, bool withInterceptor, + EventuateKafkaConsumerConfigurationProperties consumerConfigProperties) + { + EventuateDatabaseSchemaName = schema; + _subscriberId = Guid.NewGuid().ToString(); + + // Clear the ping file + File.WriteAllText(PingFileName, string.Empty); + + if (_host == null) + { + _host = SetupTestHost(withInterceptor, consumerConfigProperties); + var scopeFactory = _host.Services.GetRequiredService(); + _testServiceScope = scopeFactory.CreateScope(); + _dbContext = _testServiceScope.ServiceProvider.GetRequiredService(); + _domainEventPublisher = _testServiceScope.ServiceProvider.GetRequiredService(); + _testEventConsumer = _testServiceScope.ServiceProvider.GetRequiredService(); + _interceptor = + (TestMessageInterceptor)_testServiceScope.ServiceProvider.GetService(); + } + } + + protected async Task CleanupTestAsync() + { + await ClearDbAsync(GetDbContext(), EventuateDatabaseSchemaName); + GetTestConsumer().Reset(); + GetTestMessageInterceptor()?.Reset(); + } + + protected async Task CleanupKafkaTopicsAsync() + { + var config = new AdminClientConfig(); + config.BootstrapServers = TestSettings.KafkaBootstrapServers; + using var admin = new AdminClientBuilder(config).Build(); + Metadata kafkaMetadata = admin.GetMetadata(TimeSpan.FromSeconds(10)); + foreach (var topic in new[] + { + AggregateType12, AggregateType34, AggregateTypeDelay, TestPartitionAssignmentTopic1, + TestPartitionAssignmentTopic2 + }) + { + TopicMetadata paMessagesMetadata = kafkaMetadata.Topics.Find(t => t.Topic.Equals(topic)); + if (paMessagesMetadata != null) + { + await admin.DeleteRecordsAsync(paMessagesMetadata.Partitions.Select(p => + new TopicPartitionOffset(new TopicPartition( + topic, p.PartitionId), Offset.End))); + } + else + { + TestContext.WriteLine($"Topic {topic} did not exist."); + } + } + } + + protected void ShowTestResults() + { + TestContext.WriteLine("Test Config"); + TestContext.WriteLine(" Connection String: {0}", TestSettings.ConnectionStrings.EventuateTramDbConnection); + TestContext.WriteLine(" Kafka server: {0}", TestSettings.KafkaBootstrapServers); + TestContext.WriteLine(" Schema: {0}", EventuateDatabaseSchemaName); + TestContext.WriteLine(" Dispatcher Id: {0}", _subscriberId); + TestContext.WriteLine(" Aggregate Type 12: {0}", AggregateType12); + TestContext.WriteLine(" Aggregate Type 34: {0}", AggregateType34); + TestContext.WriteLine(" Aggregate Type Delay: {0}", AggregateTypeDelay); + + TestContext.WriteLine("Test Results"); + TestContext.WriteLine(" N Messages in DB: {0}", _dbContext.Messages.Count()); + TestContext.WriteLine(" Unpublished Count: {0}", _dbContext.Messages.Count(msg => msg.Published == 0)); + TestContext.WriteLine(" N Received in DB: {0}", + _dbContext.ReceivedMessages.Count(msg => msg.MessageId != null)); + foreach (Type eventType in _testEventConsumer.GetEventTypes()) + { + TestContext.WriteLine( + $" Received {eventType.Name} {_testEventConsumer.GetEventStatistics(eventType).MessageCount}"); + } + + TestContext.WriteLine(" Exception Count: {0}", _testEventConsumer.ExceptionCount); + + if (_interceptor != null) + { + TestContext.WriteLine("Message Interceptor Counts"); + TestContext.WriteLine(" Pre Send: {0}", _interceptor.PreSendCount); + TestContext.WriteLine(" Post Send: {0}", _interceptor.PostSendCount); + TestContext.WriteLine(" Pre Receive: {0}", _interceptor.PreReceiveCount); + TestContext.WriteLine(" Post Receive: {0}", _interceptor.PostReceiveCount); + TestContext.WriteLine(" Pre Handle: {0}", _interceptor.PreHandleCount); + TestContext.WriteLine(" Post Handle: {0}", _interceptor.PostHandleCount); + } + } + + /// + /// Set up the configuration for the HostBuilder + /// + protected void ConfigureFromEnvironmentAndSettingsFile(IConfigurationBuilder config, + Dictionary overrides = null) + { + config + .AddJsonFile(TestSettingsFile, false) + .AddEnvironmentVariables() + .AddInMemoryCollection(overrides); + } + + /// + /// Set up the configuration for the HostBuilder + /// + protected void ConfigureFromEnvironment(IConfigurationBuilder config, + Dictionary overrides = null) + { + config + .AddEnvironmentVariables() + .AddInMemoryCollection(overrides); + } + + protected IHost SetupTestHost(bool withInterceptor, + EventuateKafkaConsumerConfigurationProperties consumerConfigProperties) + { + var host = new TestHostBuilder() + .SetConnectionString(TestSettings.ConnectionStrings.EventuateTramDbConnection) + .SetEventuateDatabaseSchemaName(EventuateDatabaseSchemaName) + .SetKafkaBootstrapServers(TestSettings.KafkaBootstrapServers) + .SetSubscriberId(_subscriberId) + .SetDomainEventHandlersFactory( + provider => + { + var consumer = provider.GetRequiredService(); + return consumer.DomainEventHandlers(AggregateType12, AggregateType34, AggregateTypeDelay); + }) + .SetConsumerConfigProperties(consumerConfigProperties) + .Build(withInterceptor); + host.StartAsync().Wait(); + return host; + } + + protected void DisposeTestHost() + { + if (_host == null) + return; + + _testServiceScope.Dispose(); + _host.StopAsync().Wait(); + _host.Dispose(); + _host = null; + _dbContext = null; + _domainEventPublisher = null; + _testEventConsumer = null; + } + + protected TestEventConsumer GetTestConsumer() + { + return _testEventConsumer; + } + + protected TestMessageInterceptor GetTestMessageInterceptor() + { + return _interceptor; + } + + protected IDomainEventPublisher GetTestPublisher() + { + return _domainEventPublisher; + } + + protected EventuateTramDbContext GetDbContext() + { + return _dbContext; + } + + protected static async Task ClearDbAsync(EventuateTramDbContext dbContext, string eventuateDatabaseSchemaName) + { + var cleanMessagesSql = $"Delete from [{eventuateDatabaseSchemaName}].[message]"; + await dbContext.Database.ExecuteSqlRawAsync(cleanMessagesSql); + var cleanReceivedMessagesSql = $"Delete from [{eventuateDatabaseSchemaName}].[received_messages]"; + await dbContext.Database.ExecuteSqlRawAsync(cleanReceivedMessagesSql); + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram.IntegrationTests/TestFixtures/PerformanceTests.cs b/IO.Eventuate.Tram.IntegrationTests/TestFixtures/PerformanceTests.cs index b2e6532..20c0785 100644 --- a/IO.Eventuate.Tram.IntegrationTests/TestFixtures/PerformanceTests.cs +++ b/IO.Eventuate.Tram.IntegrationTests/TestFixtures/PerformanceTests.cs @@ -9,57 +9,59 @@ namespace IO.Eventuate.Tram.IntegrationTests.TestFixtures { - [TestFixture] - public class PerformanceTests : IntegrationTestsBase - { - [SetUp] - public async Task Setup() - { - await CleanupKafkaTopicsAsync(); - TestSetup("eventuate", false, EventuateKafkaConsumerConfigurationProperties.Empty()); - await CleanupTestAsync(); - } + [TestFixture] + public class PerformanceTests : IntegrationTestsBase + { + [SetUp] + public async Task Setup() + { + await CleanupKafkaTopicsAsync(); + TestSetup("eventuate", false, EventuateKafkaConsumerConfigurationProperties.Empty()); + await CleanupTestAsync(); + } - [TearDown] - public void TearDown() - { - DisposeTestHost(); - } + [TearDown] + public void TearDown() + { + DisposeTestHost(); + } - [Test] - public void Send1000Message_Within1Minute() - { - // Arrange - TestMessageType1 msg1 = new TestMessageType1("Msg1", 1, 1.2); - TestEventConsumer consumer = GetTestConsumer(); - TestEventConsumer.EventStatistics type1Statistics = consumer.GetEventStatistics( - typeof(TestMessageType1)); + [Test] + public void Send1000Message_Within1Minute() + { + // Arrange + TestMessageType1 msg1 = new TestMessageType1("Msg1", 1, 1.2); + TestEventConsumer consumer = GetTestConsumer(); + TestEventConsumer.EventStatistics type1Statistics = consumer.GetEventStatistics( + typeof(TestMessageType1)); - // Act - for (int x = 0; x < 1000; x++) - { - GetTestPublisher().Publish(AggregateType12, AggregateType12, new List { msg1 }); - } + // Act + for (int x = 0; x < 1000; x++) + { + GetTestPublisher().Publish(AggregateType12, AggregateType12, new List { msg1 }); + } - // Allow time for messages to process - int count = 300; - while (type1Statistics.MessageCount < 1000 && count > 0) - { - Thread.Sleep(1000); - count--; - } + // Allow time for messages to process + int count = 300; + while (type1Statistics.MessageCount < 1000 && count > 0) + { + Thread.Sleep(1000); + count--; + } - ShowTestResults(); + ShowTestResults(); - // Assert - Assert.AreEqual(1000, GetDbContext().Messages.Count(), "Expect 1000 messages produced"); - Assert.AreEqual(1000, type1Statistics.MessageCount, "Received by consumer count must be 1000"); - Assert.AreEqual(0, GetDbContext().Messages.Count(msg => msg.Published == 0), "No unpublished messages"); - Assert.AreEqual(1000, GetDbContext().ReceivedMessages.Count(msg => msg.MessageId != null), "Expect 1000 messages received"); - Assert.Less(type1Statistics.GetDuration().TotalSeconds, 60.0, "Time to send 1000 messages"); + // Assert + Assert.That(GetDbContext().Messages.Count(), Is.EqualTo(1000), "Expect 1000 messages produced"); + Assert.That(type1Statistics.MessageCount, Is.EqualTo(1000), "Received by consumer count must be 1000"); + Assert.That(GetDbContext().Messages.Count(msg => msg.Published == 0), Is.EqualTo(0), + "No unpublished messages"); + Assert.That(GetDbContext().ReceivedMessages.Count(msg => msg.MessageId != null), Is.EqualTo(1000), + "Expect 1000 messages received"); + Assert.That(type1Statistics.GetDuration().TotalSeconds, Is.LessThan(60.0), "Time to send 1000 messages"); - TestContext.WriteLine("Performance Test completed in {0} seconds", - type1Statistics.GetDuration().TotalSeconds); - } - } -} + TestContext.WriteLine("Performance Test completed in {0} seconds", + type1Statistics.GetDuration().TotalSeconds); + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram.IntegrationTests/TestFixtures/ProducerConsumerIntegrationTests.cs b/IO.Eventuate.Tram.IntegrationTests/TestFixtures/ProducerConsumerIntegrationTests.cs index 5449e03..1b4304c 100644 --- a/IO.Eventuate.Tram.IntegrationTests/TestFixtures/ProducerConsumerIntegrationTests.cs +++ b/IO.Eventuate.Tram.IntegrationTests/TestFixtures/ProducerConsumerIntegrationTests.cs @@ -11,411 +11,422 @@ namespace IO.Eventuate.Tram.IntegrationTests.TestFixtures { - [TestFixture("eventuate")] - [TestFixture("schema1")] - public class ProducerConsumerIntegrationTests : IntegrationTestsBase - { - private readonly string _schema; - - public ProducerConsumerIntegrationTests(string schema) - { - _schema = schema; - } - - [SetUp] - public async Task Setup() - { - await CleanupKafkaTopicsAsync(); - TestSetup(_schema, true, EventuateKafkaConsumerConfigurationProperties.Empty()); - await CleanupTestAsync(); - } - - [TearDown] - public void TearDown() - { - DisposeTestHost(); - } - - [Test] - public void Publish_SingleSubscribedMessageType1_MessageReceived() - { - // Arrange - TestMessageType1 msg1 = new TestMessageType1("Msg1", 1, 1.2); - TestEventConsumer.EventStatistics eventStatistics = GetTestConsumer().GetEventStatistics( - typeof(TestMessageType1)); - - // Act - GetTestPublisher().Publish(AggregateType12, AggregateType12, new List {msg1}); - - // Allow time for messages to process - AssertMessagesArePublishedAndConsumed(eventStatistics); - - msg1.AssertGoodMessageReceived(eventStatistics.ReceivedMessages[0]); - - GetTestMessageInterceptor()?.AssertCounts(1, 1, 1, 1, 1, 1); - } - - [Test] - public void Publish_SingleSubscribedMessageType2_MessageReceived() - { - // Arrange - TestMessageType2 msg2 = new TestMessageType2("Msg2", 2); - TestEventConsumer.EventStatistics eventStatistics = GetTestConsumer().GetEventStatistics( - typeof(TestMessageType2)); - - // Act - GetTestPublisher().Publish(AggregateType12, AggregateType12, new List {msg2}); - - // Allow time for messages to process - AssertMessagesArePublishedAndConsumed(eventStatistics); - - msg2.AssertGoodMessageReceived(eventStatistics.ReceivedMessages[0]); - - GetTestMessageInterceptor()?.AssertCounts(1, 1, 1, 1, 1, 1); - } - - [Test] - public void Publish_MultipleSubscribedMessageTypes_AllMessagesReceived() - { - // Arrange - int numberOfEvents = 5; - TestMessageType1 msg1 = new TestMessageType1("Msg1", 1, 1.2); - TestMessageType2 msg2 = new TestMessageType2("Msg2", 2); - TestMessageType3 msg3 = new TestMessageType3("Msg3", 3); - TestMessageType4 msg4 = new TestMessageType4("Msg4", 4); - TestMessageTypeDelay msgD = new TestMessageTypeDelay("MsgD", 5); - TestEventConsumer consumer = GetTestConsumer(); - - // Act - GetTestPublisher().Publish(AggregateType12, AggregateType12, new List {msg1}); - GetTestPublisher().Publish(AggregateType12, AggregateType12, new List {msg2}); - GetTestPublisher().Publish(AggregateType34, AggregateType34, new List {msg3}); - GetTestPublisher().Publish(AggregateType34, AggregateType34, new List {msg4}); - GetTestPublisher().Publish(AggregateTypeDelay, AggregateTypeDelay, new List {msgD}); - - // Allow time for messages to process - int count = 10; - while (consumer.TotalMessageCount() < numberOfEvents && count > 0) - { - TestContext.WriteLine($"TotalMessageCount: {consumer.TotalMessageCount()} ({count})"); - Thread.Sleep(1000); - count--; - } - TestContext.WriteLine($"TotalMessageCount: {consumer.TotalMessageCount()}"); - - ShowTestResults(); - - // Assert - Assert.That(GetDbContext().Messages.Count(), - Is.EqualTo(numberOfEvents), "Number of messages produced"); - Assert.That(GetDbContext().Messages.Count(msg => msg.Published == 0), - Is.EqualTo(0), "Number of unpublished messages"); - foreach (Type eventType in consumer.GetEventTypes()) - { - TestEventConsumer.EventStatistics eventStatistics = consumer.GetEventStatistics(eventType); - Assert.That(eventStatistics.MessageCount, - Is.EqualTo(1), $"Number of {eventType.Name} messages received by consumer"); - Assert.That(eventStatistics.ReceivedMessages.Count, - Is.EqualTo(1), $"Number of received {eventType.Name} messages"); - - } - - Assert.That(consumer.TotalMessageCount(), - Is.EqualTo(numberOfEvents), "Total number of messages received by consumer"); - Assert.That(GetDbContext().ReceivedMessages.Count(msg => msg.MessageId != null), - Is.EqualTo(numberOfEvents), "Number of received messages"); - Assert.Multiple(() => - { - msg1.AssertGoodMessageReceived(consumer.GetEventStatistics(typeof(TestMessageType1)).ReceivedMessages[0]); - msg2.AssertGoodMessageReceived(consumer.GetEventStatistics(typeof(TestMessageType2)).ReceivedMessages[0]); - msg3.AssertGoodMessageReceived(consumer.GetEventStatistics(typeof(TestMessageType3)).ReceivedMessages[0]); - msg4.AssertGoodMessageReceived(consumer.GetEventStatistics(typeof(TestMessageType4)).ReceivedMessages[0]); - msgD.AssertGoodMessageReceived(consumer.GetEventStatistics(typeof(TestMessageTypeDelay)).ReceivedMessages[0]); - }); - GetTestMessageInterceptor()?.AssertCounts(numberOfEvents, numberOfEvents, numberOfEvents, numberOfEvents, numberOfEvents, numberOfEvents); - } - - [Test] - public void Publish_SubscribedMessageTypeAndUnsubscribedMessageType_ReceivedOnlySubscribedMessageType() - { - // Arrange - TestMessageType1 msg1 = new TestMessageType1("Msg1", 1, 1.2); - TestMessageUnsubscribedType unsubscribedMessage = new TestMessageUnsubscribedType("Msg3"); - TestEventConsumer consumer = GetTestConsumer(); - - // Act - GetTestPublisher().Publish(AggregateType12, AggregateType12, new List {unsubscribedMessage}); - // Send a following message to identify when we're done - GetTestPublisher().Publish(AggregateType12, AggregateType12, new List {msg1}); - - // Allow time for messages to process - int count = 10; - TestEventConsumer.EventStatistics type1Statistics = consumer.GetEventStatistics(typeof(TestMessageType1)); - while (type1Statistics.MessageCount < 1 && count > 0) - { - Thread.Sleep(1000); - count--; - } - - ShowTestResults(); - - // Assert - Assert.That(GetDbContext().Messages.Count(), - Is.EqualTo(2), "Number of messages produced"); - Assert.That(GetDbContext().Messages.Count(msg => msg.Published == 0), - Is.EqualTo(0), "Number of unpublished messages"); - Assert.That(GetDbContext().ReceivedMessages.Count(msg => msg.MessageId != null), - Is.EqualTo(2), "Number of received messages"); - Assert.That(consumer.TotalMessageCount(), - Is.EqualTo(1), "Total number of messages received by consumer"); - - GetTestMessageInterceptor()?.AssertCounts(2, 2, 2, 2, 2, 2); - } - - [Test] - public void Publish_SubscribedTopicAndUnsubscribedTopic_ReceivesOnlySubscribedMessage() - { - // Arrange - TestMessageType1 msg1 = new TestMessageType1("Msg1", 1, 1.2); - TestEventConsumer consumer = GetTestConsumer(); - - // Act - GetTestPublisher().Publish("BadTopic", "BadTopic", new List {msg1}); - // Send a following message to identify when we're done - GetTestPublisher().Publish(AggregateType12, AggregateType12, new List {msg1}); - - // Allow time for messages to process - int count = 10; - TestEventConsumer.EventStatistics type1Statistics = consumer.GetEventStatistics(typeof(TestMessageType1)); - while (type1Statistics.MessageCount < 1 && count > 0) - { - Thread.Sleep(1000); - count--; - } - - ShowTestResults(); - - // Assert - Assert.That(GetDbContext().Messages.Count(), - Is.EqualTo(2), "Number of messages produced"); - Assert.That(GetDbContext().Messages.Count(msg => msg.Published == 0), - Is.EqualTo(0), "Number of unpublished messages"); - Assert.That(GetDbContext().ReceivedMessages.Count(msg => msg.MessageId != null), - Is.EqualTo(1), "Number of received messages"); - Assert.That(consumer.TotalMessageCount(), - Is.EqualTo(1), "Total number of messages received by consumer"); - - GetTestMessageInterceptor()?.AssertCounts(2, 2, 1, 1, 1, 1); - } - - [Test] - public void Publish_SubscriberThrowsExceptionOnFirstOfMultipleMessages_MessagesHandlingStops() - { - // Arrange - TestMessageType1 badmsg1 = new TestMessageType1("ThrowException", 1, 1.2); - TestMessageType2 msg2A = new TestMessageType2("Msg2a", 1); - TestMessageType2 msg2B = new TestMessageType2("Msg2b", 2); - TestEventConsumer consumer = GetTestConsumer(); - - // Act - GetTestPublisher().Publish(AggregateType12, AggregateType12, new List {badmsg1}); - GetTestPublisher().Publish(AggregateType12, AggregateType12, new List {msg2A}); - GetTestPublisher().Publish(AggregateType12, AggregateType12, new List {msg2B}); - - // Allow time for messages to process - int count = 10; - TestEventConsumer.EventStatistics type2Statistics = consumer.GetEventStatistics(typeof(TestMessageType2)); - while (type2Statistics.MessageCount < 2 && count > 0) - { - Thread.Sleep(1000); - count--; - } - - ShowTestResults(); - - // Assert - Assert.That(GetDbContext().Messages.Count(), - Is.EqualTo(3), "Number of messages produced"); - Assert.That(GetDbContext().Messages.Count(msg => msg.Published == 0), - Is.EqualTo(0), "Number of unpublished messages"); - Assert.That(GetDbContext().ReceivedMessages.Count(msg => msg.MessageId != null), - Is.EqualTo(0), "Number of received messages"); - Assert.That(consumer.TotalMessageCount(), - Is.EqualTo(1), "Total number of messages received by consumer"); - Assert.That(consumer.GetEventStatistics(typeof(TestMessageType1)).MessageCount, - Is.EqualTo(1), "Number of Type 1 messages received by consumer"); - Assert.That(consumer.GetEventStatistics(typeof(TestMessageType2)).MessageCount, - Is.EqualTo(0), "Number of Type 2 messages received by consumer"); - } - - [Test] - public void Publish_DefaultEventTypeName_CorrectEventTypeHeader() - { - // Arrange - TestMessageType1 msg1 = new TestMessageType1("Msg1", 2, 3.3); - TestEventConsumer consumer = GetTestConsumer(); - - // Act - GetTestPublisher().Publish(AggregateType12, AggregateType12, new List {msg1}); - - // Allow time for messages to process - int count = 10; - TestEventConsumer.EventStatistics type1Statistics = consumer.GetEventStatistics(typeof(TestMessageType1)); - while (type1Statistics.MessageCount < 1 && count > 0) - { - Thread.Sleep(1000); - count--; - } - - ShowTestResults(); - - // Assert - Assert.That(GetDbContext().Messages.Count(), - Is.EqualTo(1), "Number of messages produced"); - Assert.That(GetDbContext().Messages.Count(msg => msg.Published == 0), - Is.EqualTo(0), "Number of unpublished messages"); - Assert.That(GetDbContext().ReceivedMessages.Count(msg => msg.MessageId != null), - Is.EqualTo(1), "Number of received messages"); - Assert.That(type1Statistics.MessageCount, - Is.EqualTo(1), "Number of Type 1 messages received by consumer"); - Assert.That(type1Statistics.ReceivedMessages.Count, - Is.EqualTo(1), "Number of received type 1 messages"); - - msg1.AssertGoodMessageReceived(type1Statistics.ReceivedMessages[0]); - Assert.That(type1Statistics.ReceivedMessages[0].Message.GetHeader(EventMessageHeaders.EventType), - Is.EqualTo(typeof(TestMessageType1).FullName), "Event type header"); - - GetTestMessageInterceptor()?.AssertCounts(1, 1, 1, 1, 1, 1); - } - - [Test] - public void Publish_CustomEventTypeName_CorrectEventTypeHeader() - { - // Arrange - TestMessageType2 msg2 = new TestMessageType2("Msg2", 2); - TestEventConsumer consumer = GetTestConsumer(); - - // Act - GetTestPublisher().Publish(AggregateType12, AggregateType12, new List {msg2}); - - // Allow time for messages to process - int count = 10; - TestEventConsumer.EventStatistics type2Statistics = consumer.GetEventStatistics(typeof(TestMessageType2)); - while (type2Statistics.MessageCount < 1 && count > 0) - { - Thread.Sleep(1000); - count--; - } - - ShowTestResults(); - - // Assert - Assert.That(GetDbContext().Messages.Count(), - Is.EqualTo(1), "Number of messages produced"); - Assert.That(GetDbContext().Messages.Count(msg => msg.Published == 0), - Is.EqualTo(0), "Number of unpublished messages"); - Assert.That(GetDbContext().ReceivedMessages.Count(msg => msg.MessageId != null), - Is.EqualTo(1), "Number of received messages"); - Assert.That(type2Statistics.MessageCount, - Is.EqualTo(1), "Number of Type 2 messages received by consumer"); - Assert.That(type2Statistics.ReceivedMessages.Count, - Is.EqualTo(1), "Number of received type 2 messages"); - - msg2.AssertGoodMessageReceived(type2Statistics.ReceivedMessages[0]); - Assert.That(type2Statistics.ReceivedMessages[0].Message.GetHeader(EventMessageHeaders.EventType), - Is.EqualTo(TestMessageType2.EventTypeName), "Event type header"); - - GetTestMessageInterceptor()?.AssertCounts(1, 1, 1, 1, 1, 1); - } - - [Test] - public async Task PublishAsync_SingleSubscribedMessageType1_MessageReceived() - { - // Arrange - TestMessageType1 msg1 = new TestMessageType1("Msg1", 1, 1.2); - TestEventConsumer.EventStatistics eventStatistics = GetTestConsumer().GetEventStatistics( - typeof(TestMessageType1)); - - // Act - await GetTestPublisher().PublishAsync(AggregateType12, AggregateType12, new List {msg1}); - - // Allow time for messages to process - AssertMessagesArePublishedAndConsumed(eventStatistics); - - msg1.AssertGoodMessageReceived(eventStatistics.ReceivedMessages[0]); - - GetTestMessageInterceptor()?.AssertCounts(1, 1, 1, 1, 1, 1); - } - - [Test] - [TestCase(true)] - [TestCase(false)] - public async Task PublishAsync_TestHostShutdownDuringProcessing_CurrentMessageProcessedOrCanceledAndOtherNextMessageNotStarted(bool handlerCancelsWork) - { - // Arrange - TestEventConsumer testConsumer = GetTestConsumer(); - testConsumer.DelayHandlerCancels = handlerCancelsWork; - TestMessageTypeDelay msgA = new TestMessageTypeDelay($"msgA-{Guid.NewGuid()}", 1); - TestMessageTypeDelay msgB = new TestMessageTypeDelay($"msgB-{Guid.NewGuid()}", 2); - TestEventConsumer.EventStatistics eventStatistics = testConsumer.GetEventStatistics( - typeof(TestMessageTypeDelay)); - - // Act - Publish some type 3 messages - await GetTestPublisher().PublishAsync(AggregateTypeDelay, AggregateTypeDelay, new List {msgA, msgB}); - - // Wait for first message to be received and dispose test host - int count = 10; - while (eventStatistics.ReceivedMessages.Count < 1 && count > 0) - { - Thread.Sleep(1000); - count--; - } - string[] pingsBeforeShutdown = await File.ReadAllLinesAsync(PingFileName); - DisposeTestHost(); - string[] pingsAfterShutdown = await File.ReadAllLinesAsync(PingFileName); - - // Allow time to pass - Thread.Sleep(TestEventConsumer.MessageType3ProcessingDelay); - string[] delayedPings = await File.ReadAllLinesAsync(PingFileName); - - // Assert - First message processing started but not completed before shutdown - Assert.That(pingsBeforeShutdown.Length, Is.EqualTo(1)); - Assert.That(pingsBeforeShutdown[0], Contains.Substring("Received event").And.Contains(msgA.Name)); - // Verify first message processing completed or not by end of shutdown (shouldn't complete if handlerCancels) - int expectedNumberOfLinesAfterShutdown = handlerCancelsWork ? 1 : 2; - Assert.That(pingsAfterShutdown.Length, Is.EqualTo(expectedNumberOfLinesAfterShutdown)); - if (!handlerCancelsWork) - { - Assert.That(pingsAfterShutdown[1], Contains.Substring($"Processed event").And.Contains(msgA.Name)); - } - - // Verify second message not started after shutdown - Assert.That(delayedPings.Length, Is.EqualTo(expectedNumberOfLinesAfterShutdown)); - Assert.That(delayedPings, Is.EquivalentTo(pingsAfterShutdown)); - } - - private void AssertMessagesArePublishedAndConsumed(TestEventConsumer.EventStatistics eventStatistics) - { - int count = 10; - while (eventStatistics.MessageCount < 1 && count > 0) - { - Thread.Sleep(1000); - count--; - } - - ShowTestResults(); - - // Assert - Assert.That(GetDbContext().Messages.Count(), - Is.EqualTo(1), "Number of messages produced"); - Assert.That(GetDbContext().Messages.Count(msg => msg.Published == 0), - Is.EqualTo(0), "Number of unpublished messages"); - Assert.That(GetDbContext().ReceivedMessages.Count(msg => msg.MessageId != null), - Is.EqualTo(1), "Number of received messages"); - Assert.That(eventStatistics.MessageCount, - Is.EqualTo(1), "Number of Type 1 messages received by consumer"); - Assert.That(eventStatistics.ReceivedMessages.Count, - Is.EqualTo(1), "Number of received type 1 messages"); - } - } -} + [TestFixture("eventuate")] + [TestFixture("schema1")] + public class ProducerConsumerIntegrationTests : IntegrationTestsBase + { + private readonly string _schema; + + public ProducerConsumerIntegrationTests(string schema) + { + _schema = schema; + } + + [SetUp] + public async Task Setup() + { + await CleanupKafkaTopicsAsync(); + TestSetup(_schema, true, EventuateKafkaConsumerConfigurationProperties.Empty()); + await CleanupTestAsync(); + } + + [TearDown] + public void TearDown() + { + DisposeTestHost(); + } + + [Test] + public void Publish_SingleSubscribedMessageType1_MessageReceived() + { + // Arrange + TestMessageType1 msg1 = new TestMessageType1("Msg1", 1, 1.2); + TestEventConsumer.EventStatistics eventStatistics = GetTestConsumer().GetEventStatistics( + typeof(TestMessageType1)); + + // Act + GetTestPublisher().Publish(AggregateType12, AggregateType12, new List { msg1 }); + + // Allow time for messages to process + AssertMessagesArePublishedAndConsumed(eventStatistics); + + msg1.AssertGoodMessageReceived(eventStatistics.ReceivedMessages[0]); + + GetTestMessageInterceptor()?.AssertCounts(1, 1, 1, 1, 1, 1); + } + + [Test] + public void Publish_SingleSubscribedMessageType2_MessageReceived() + { + // Arrange + TestMessageType2 msg2 = new TestMessageType2("Msg2", 2); + TestEventConsumer.EventStatistics eventStatistics = GetTestConsumer().GetEventStatistics( + typeof(TestMessageType2)); + + // Act + GetTestPublisher().Publish(AggregateType12, AggregateType12, new List { msg2 }); + + // Allow time for messages to process + AssertMessagesArePublishedAndConsumed(eventStatistics); + + msg2.AssertGoodMessageReceived(eventStatistics.ReceivedMessages[0]); + + GetTestMessageInterceptor()?.AssertCounts(1, 1, 1, 1, 1, 1); + } + + [Test] + public void Publish_MultipleSubscribedMessageTypes_AllMessagesReceived() + { + // Arrange + int numberOfEvents = 5; + TestMessageType1 msg1 = new TestMessageType1("Msg1", 1, 1.2); + TestMessageType2 msg2 = new TestMessageType2("Msg2", 2); + TestMessageType3 msg3 = new TestMessageType3("Msg3", 3); + TestMessageType4 msg4 = new TestMessageType4("Msg4", 4); + TestMessageTypeDelay msgD = new TestMessageTypeDelay("MsgD", 5); + TestEventConsumer consumer = GetTestConsumer(); + + // Act + GetTestPublisher().Publish(AggregateType12, AggregateType12, new List { msg1 }); + GetTestPublisher().Publish(AggregateType12, AggregateType12, new List { msg2 }); + GetTestPublisher().Publish(AggregateType34, AggregateType34, new List { msg3 }); + GetTestPublisher().Publish(AggregateType34, AggregateType34, new List { msg4 }); + GetTestPublisher().Publish(AggregateTypeDelay, AggregateTypeDelay, new List { msgD }); + + // Allow time for messages to process + int count = 10; + while (consumer.TotalMessageCount() < numberOfEvents && count > 0) + { + TestContext.WriteLine($"TotalMessageCount: {consumer.TotalMessageCount()} ({count})"); + Thread.Sleep(1000); + count--; + } + + TestContext.WriteLine($"TotalMessageCount: {consumer.TotalMessageCount()}"); + + ShowTestResults(); + + // Assert + Assert.That(GetDbContext().Messages.Count(), + Is.EqualTo(numberOfEvents), "Number of messages produced"); + Assert.That(GetDbContext().Messages.Count(msg => msg.Published == 0), + Is.EqualTo(0), "Number of unpublished messages"); + foreach (Type eventType in consumer.GetEventTypes()) + { + TestEventConsumer.EventStatistics eventStatistics = consumer.GetEventStatistics(eventType); + Assert.That(eventStatistics.MessageCount, + Is.EqualTo(1), $"Number of {eventType.Name} messages received by consumer"); + Assert.That(eventStatistics.ReceivedMessages.Count, + Is.EqualTo(1), $"Number of received {eventType.Name} messages"); + } + + Assert.That(consumer.TotalMessageCount(), + Is.EqualTo(numberOfEvents), "Total number of messages received by consumer"); + Assert.That(GetDbContext().ReceivedMessages.Count(msg => msg.MessageId != null), + Is.EqualTo(numberOfEvents), "Number of received messages"); + Assert.Multiple(() => + { + msg1.AssertGoodMessageReceived( + consumer.GetEventStatistics(typeof(TestMessageType1)).ReceivedMessages[0]); + msg2.AssertGoodMessageReceived( + consumer.GetEventStatistics(typeof(TestMessageType2)).ReceivedMessages[0]); + msg3.AssertGoodMessageReceived( + consumer.GetEventStatistics(typeof(TestMessageType3)).ReceivedMessages[0]); + msg4.AssertGoodMessageReceived( + consumer.GetEventStatistics(typeof(TestMessageType4)).ReceivedMessages[0]); + msgD.AssertGoodMessageReceived(consumer.GetEventStatistics(typeof(TestMessageTypeDelay)) + .ReceivedMessages[0]); + }); + GetTestMessageInterceptor()?.AssertCounts(numberOfEvents, numberOfEvents, numberOfEvents, numberOfEvents, + numberOfEvents, numberOfEvents); + } + + [Test] + public void Publish_SubscribedMessageTypeAndUnsubscribedMessageType_ReceivedOnlySubscribedMessageType() + { + // Arrange + TestMessageType1 msg1 = new TestMessageType1("Msg1", 1, 1.2); + TestMessageUnsubscribedType unsubscribedMessage = new TestMessageUnsubscribedType("Msg3"); + TestEventConsumer consumer = GetTestConsumer(); + + // Act + GetTestPublisher().Publish(AggregateType12, AggregateType12, + new List { unsubscribedMessage }); + // Send a following message to identify when we're done + GetTestPublisher().Publish(AggregateType12, AggregateType12, new List { msg1 }); + + // Allow time for messages to process + int count = 10; + TestEventConsumer.EventStatistics type1Statistics = consumer.GetEventStatistics(typeof(TestMessageType1)); + while (type1Statistics.MessageCount < 1 && count > 0) + { + Thread.Sleep(1000); + count--; + } + + ShowTestResults(); + + // Assert + Assert.That(GetDbContext().Messages.Count(), + Is.EqualTo(2), "Number of messages produced"); + Assert.That(GetDbContext().Messages.Count(msg => msg.Published == 0), + Is.EqualTo(0), "Number of unpublished messages"); + Assert.That(GetDbContext().ReceivedMessages.Count(msg => msg.MessageId != null), + Is.EqualTo(2), "Number of received messages"); + Assert.That(consumer.TotalMessageCount(), + Is.EqualTo(1), "Total number of messages received by consumer"); + + GetTestMessageInterceptor()?.AssertCounts(2, 2, 2, 2, 2, 2); + } + + [Test] + public void Publish_SubscribedTopicAndUnsubscribedTopic_ReceivesOnlySubscribedMessage() + { + // Arrange + TestMessageType1 msg1 = new TestMessageType1("Msg1", 1, 1.2); + TestEventConsumer consumer = GetTestConsumer(); + + // Act + GetTestPublisher().Publish("BadTopic", "BadTopic", new List { msg1 }); + // Send a following message to identify when we're done + GetTestPublisher().Publish(AggregateType12, AggregateType12, new List { msg1 }); + + // Allow time for messages to process + int count = 10; + TestEventConsumer.EventStatistics type1Statistics = consumer.GetEventStatistics(typeof(TestMessageType1)); + while (type1Statistics.MessageCount < 1 && count > 0) + { + Thread.Sleep(1000); + count--; + } + + ShowTestResults(); + + // Assert + Assert.That(GetDbContext().Messages.Count(), + Is.EqualTo(2), "Number of messages produced"); + Assert.That(GetDbContext().Messages.Count(msg => msg.Published == 0), + Is.EqualTo(0), "Number of unpublished messages"); + Assert.That(GetDbContext().ReceivedMessages.Count(msg => msg.MessageId != null), + Is.EqualTo(1), "Number of received messages"); + Assert.That(consumer.TotalMessageCount(), + Is.EqualTo(1), "Total number of messages received by consumer"); + + GetTestMessageInterceptor()?.AssertCounts(2, 2, 1, 1, 1, 1); + } + + [Test] + public void Publish_SubscriberThrowsExceptionOnFirstOfMultipleMessages_MessagesHandlingStops() + { + // Arrange + TestMessageType1 badmsg1 = new TestMessageType1("ThrowException", 1, 1.2); + TestMessageType2 msg2A = new TestMessageType2("Msg2a", 1); + TestMessageType2 msg2B = new TestMessageType2("Msg2b", 2); + TestEventConsumer consumer = GetTestConsumer(); + + // Act + GetTestPublisher().Publish(AggregateType12, AggregateType12, new List { badmsg1 }); + GetTestPublisher().Publish(AggregateType12, AggregateType12, new List { msg2A }); + GetTestPublisher().Publish(AggregateType12, AggregateType12, new List { msg2B }); + + // Allow time for messages to process + int count = 10; + TestEventConsumer.EventStatistics type2Statistics = consumer.GetEventStatistics(typeof(TestMessageType2)); + while (type2Statistics.MessageCount < 2 && count > 0) + { + Thread.Sleep(1000); + count--; + } + + ShowTestResults(); + + // Assert + Assert.That(GetDbContext().Messages.Count(), + Is.EqualTo(3), "Number of messages produced"); + Assert.That(GetDbContext().Messages.Count(msg => msg.Published == 0), + Is.EqualTo(0), "Number of unpublished messages"); + Assert.That(GetDbContext().ReceivedMessages.Count(msg => msg.MessageId != null), + Is.EqualTo(0), "Number of received messages"); + Assert.That(consumer.TotalMessageCount(), + Is.EqualTo(1), "Total number of messages received by consumer"); + Assert.That(consumer.GetEventStatistics(typeof(TestMessageType1)).MessageCount, + Is.EqualTo(1), "Number of Type 1 messages received by consumer"); + Assert.That(consumer.GetEventStatistics(typeof(TestMessageType2)).MessageCount, + Is.EqualTo(0), "Number of Type 2 messages received by consumer"); + } + + [Test] + public void Publish_DefaultEventTypeName_CorrectEventTypeHeader() + { + // Arrange + TestMessageType1 msg1 = new TestMessageType1("Msg1", 2, 3.3); + TestEventConsumer consumer = GetTestConsumer(); + + // Act + GetTestPublisher().Publish(AggregateType12, AggregateType12, new List { msg1 }); + + // Allow time for messages to process + int count = 10; + TestEventConsumer.EventStatistics type1Statistics = consumer.GetEventStatistics(typeof(TestMessageType1)); + while (type1Statistics.MessageCount < 1 && count > 0) + { + Thread.Sleep(1000); + count--; + } + + ShowTestResults(); + + // Assert + Assert.That(GetDbContext().Messages.Count(), + Is.EqualTo(1), "Number of messages produced"); + Assert.That(GetDbContext().Messages.Count(msg => msg.Published == 0), + Is.EqualTo(0), "Number of unpublished messages"); + Assert.That(GetDbContext().ReceivedMessages.Count(msg => msg.MessageId != null), + Is.EqualTo(1), "Number of received messages"); + Assert.That(type1Statistics.MessageCount, + Is.EqualTo(1), "Number of Type 1 messages received by consumer"); + Assert.That(type1Statistics.ReceivedMessages.Count, + Is.EqualTo(1), "Number of received type 1 messages"); + + msg1.AssertGoodMessageReceived(type1Statistics.ReceivedMessages[0]); + Assert.That(type1Statistics.ReceivedMessages[0].Message.GetHeader(EventMessageHeaders.EventType), + Is.EqualTo(typeof(TestMessageType1).FullName), "Event type header"); + + GetTestMessageInterceptor()?.AssertCounts(1, 1, 1, 1, 1, 1); + } + + [Test] + public void Publish_CustomEventTypeName_CorrectEventTypeHeader() + { + // Arrange + TestMessageType2 msg2 = new TestMessageType2("Msg2", 2); + TestEventConsumer consumer = GetTestConsumer(); + + // Act + GetTestPublisher().Publish(AggregateType12, AggregateType12, new List { msg2 }); + + // Allow time for messages to process + int count = 10; + TestEventConsumer.EventStatistics type2Statistics = consumer.GetEventStatistics(typeof(TestMessageType2)); + while (type2Statistics.MessageCount < 1 && count > 0) + { + Thread.Sleep(1000); + count--; + } + + ShowTestResults(); + + // Assert + Assert.That(GetDbContext().Messages.Count(), + Is.EqualTo(1), "Number of messages produced"); + Assert.That(GetDbContext().Messages.Count(msg => msg.Published == 0), + Is.EqualTo(0), "Number of unpublished messages"); + Assert.That(GetDbContext().ReceivedMessages.Count(msg => msg.MessageId != null), + Is.EqualTo(1), "Number of received messages"); + Assert.That(type2Statistics.MessageCount, + Is.EqualTo(1), "Number of Type 2 messages received by consumer"); + Assert.That(type2Statistics.ReceivedMessages.Count, + Is.EqualTo(1), "Number of received type 2 messages"); + + msg2.AssertGoodMessageReceived(type2Statistics.ReceivedMessages[0]); + Assert.That(type2Statistics.ReceivedMessages[0].Message.GetHeader(EventMessageHeaders.EventType), + Is.EqualTo(TestMessageType2.EventTypeName), "Event type header"); + + GetTestMessageInterceptor()?.AssertCounts(1, 1, 1, 1, 1, 1); + } + + [Test] + public async Task PublishAsync_SingleSubscribedMessageType1_MessageReceived() + { + // Arrange + TestMessageType1 msg1 = new TestMessageType1("Msg1", 1, 1.2); + TestEventConsumer.EventStatistics eventStatistics = GetTestConsumer().GetEventStatistics( + typeof(TestMessageType1)); + + // Act + await GetTestPublisher().PublishAsync(AggregateType12, AggregateType12, new List { msg1 }); + + // Allow time for messages to process + AssertMessagesArePublishedAndConsumed(eventStatistics); + + msg1.AssertGoodMessageReceived(eventStatistics.ReceivedMessages[0]); + + GetTestMessageInterceptor()?.AssertCounts(1, 1, 1, 1, 1, 1); + } + + [Test] + [TestCase(true)] + [TestCase(false)] + public async Task + PublishAsync_TestHostShutdownDuringProcessing_CurrentMessageProcessedOrCanceledAndOtherNextMessageNotStarted( + bool handlerCancelsWork) + { + // Arrange + TestEventConsumer testConsumer = GetTestConsumer(); + testConsumer.DelayHandlerCancels = handlerCancelsWork; + TestMessageTypeDelay msgA = new TestMessageTypeDelay($"msgA-{Guid.NewGuid()}", 1); + TestMessageTypeDelay msgB = new TestMessageTypeDelay($"msgB-{Guid.NewGuid()}", 2); + TestEventConsumer.EventStatistics eventStatistics = testConsumer.GetEventStatistics( + typeof(TestMessageTypeDelay)); + + // Act - Publish some type 3 messages + await GetTestPublisher().PublishAsync(AggregateTypeDelay, AggregateTypeDelay, + new List { msgA, msgB }); + + // Wait for first message to be received and dispose test host + int count = 10; + while (eventStatistics.ReceivedMessages.Count < 1 && count > 0) + { + Thread.Sleep(1000); + count--; + } + + string[] pingsBeforeShutdown = await File.ReadAllLinesAsync(PingFileName); + DisposeTestHost(); + string[] pingsAfterShutdown = await File.ReadAllLinesAsync(PingFileName); + + // Allow time to pass + Thread.Sleep(TestEventConsumer.MessageType3ProcessingDelay); + string[] delayedPings = await File.ReadAllLinesAsync(PingFileName); + + // Assert - First message processing started but not completed before shutdown + Assert.That(pingsBeforeShutdown.Length, Is.EqualTo(1)); + Assert.That(pingsBeforeShutdown[0], Contains.Substring("Received event").And.Contains(msgA.Name)); + // Verify first message processing completed or not by end of shutdown (shouldn't complete if handlerCancels) + int expectedNumberOfLinesAfterShutdown = handlerCancelsWork ? 1 : 2; + Assert.That(pingsAfterShutdown.Length, Is.EqualTo(expectedNumberOfLinesAfterShutdown)); + if (!handlerCancelsWork) + { + Assert.That(pingsAfterShutdown[1], Contains.Substring($"Processed event").And.Contains(msgA.Name)); + } + + // Verify second message not started after shutdown + Assert.That(delayedPings.Length, Is.EqualTo(expectedNumberOfLinesAfterShutdown)); + Assert.That(delayedPings, Is.EquivalentTo(pingsAfterShutdown)); + } + + private void AssertMessagesArePublishedAndConsumed(TestEventConsumer.EventStatistics eventStatistics) + { + int count = 10; + while (eventStatistics.MessageCount < 1 && count > 0) + { + Thread.Sleep(1000); + count--; + } + + ShowTestResults(); + + // Assert + Assert.That(GetDbContext().Messages.Count(), + Is.EqualTo(1), "Number of messages produced"); + Assert.That(GetDbContext().Messages.Count(msg => msg.Published == 0), + Is.EqualTo(0), "Number of unpublished messages"); + Assert.That(GetDbContext().ReceivedMessages.Count(msg => msg.MessageId != null), + Is.EqualTo(1), "Number of received messages"); + Assert.That(eventStatistics.MessageCount, + Is.EqualTo(1), "Number of Type 1 messages received by consumer"); + Assert.That(eventStatistics.ReceivedMessages.Count, + Is.EqualTo(1), "Number of received type 1 messages"); + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram.IntegrationTests/TestHelpers/DynamicEventuateSchemaModelCacheKeyFactory.cs b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/DynamicEventuateSchemaModelCacheKeyFactory.cs index 68dab73..1ef5bd8 100644 --- a/IO.Eventuate.Tram.IntegrationTests/TestHelpers/DynamicEventuateSchemaModelCacheKeyFactory.cs +++ b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/DynamicEventuateSchemaModelCacheKeyFactory.cs @@ -10,7 +10,7 @@ namespace IO.Eventuate.Tram.IntegrationTests.TestHelpers /// public class DynamicEventuateSchemaModelCacheKeyFactory : IModelCacheKeyFactory { - public object Create(DbContext context) + public object Create(DbContext context, bool designTime) { if (context is EventuateTramDbContext eventuateTramDbContext) { diff --git a/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestEventConsumer.cs b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestEventConsumer.cs index a973a7a..0a7d92e 100644 --- a/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestEventConsumer.cs +++ b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestEventConsumer.cs @@ -11,148 +11,150 @@ namespace IO.Eventuate.Tram.IntegrationTests.TestHelpers { - public class TestEventConsumer - { - private readonly ILogger _logger; + public class TestEventConsumer + { + private readonly ILogger _logger; - public static int MessageType3ProcessingDelay = 5000; + public static int MessageType3ProcessingDelay = 5000; /// /// Hold the statistics for a given message type. /// - public class EventStatistics - { - public int MessageCount { get; set; } - public DateTime FirstMessageTime { get; set; } - public DateTime LastMessageTime { get; set; } - public List> ReceivedMessages { get; set; } - public TimeSpan GetDuration() - { - return LastMessageTime > FirstMessageTime - ? LastMessageTime - FirstMessageTime - : TimeSpan.Zero; - } - - } - - private readonly Dictionary _statisticsForEvent; - public int ExceptionCount; - - public bool DelayHandlerCancels { get; set; } - - public TestEventConsumer(ILogger logger) - { - _logger = logger; - _statisticsForEvent = new Dictionary - { - {typeof(TestMessageType1), new EventStatistics()}, - {typeof(TestMessageType2), new EventStatistics()}, - {typeof(TestMessageType3), new EventStatistics()}, - {typeof(TestMessageType4), new EventStatistics()}, - {typeof(TestMessageTypeDelay), new EventStatistics()} - }; - } - - public EventStatistics GetEventStatistics(Type eventType) - { - return _statisticsForEvent[eventType]; - } - - public IEnumerable GetEventTypes() - { - return _statisticsForEvent.Keys; - } - - public void Reset() - { - foreach (EventStatistics eventStatistics in _statisticsForEvent.Values) - { - eventStatistics.MessageCount = 0; - eventStatistics.FirstMessageTime = DateTime.MaxValue; - eventStatistics.LastMessageTime = DateTime.MinValue; - eventStatistics.ReceivedMessages = new List>(); - } - ExceptionCount = 0; - DelayHandlerCancels = false; - } - - public int TotalMessageCount() - { - return _statisticsForEvent.Aggregate(0, (total, stat) => - total + stat.Value.MessageCount); - } - - public DomainEventHandlers DomainEventHandlers(String aggregateType12, - String aggregateType34, String aggregateTypeDelay) - { - return DomainEventHandlersBuilder.ForAggregateType(aggregateType12) - .OnEvent(HandleMessageType1EventAsync) - .OnEvent(HandleMessageType2EventAsync) - .AndForAggregateType(aggregateType34) - .OnEvent(HandleMessageType3EventAsync) - .OnEvent() - .AndForAggregateType(aggregateTypeDelay) - .OnEvent(HandleMessageTypeDelayEventAsync) - .Build(); - } - - private Task HandleMessageType1EventAsync(IDomainEventEnvelope @event) - { - _logger.LogDebug("Got MessageType1Event with id={} and value={}", @event.EventId, - @event.Event.ToString()); - EventStatistics eventStatistics = GetEventStatistics(typeof(TestMessageType1)); - HandleTestMessageEvent(@event, eventStatistics); - if (@event.Event.Name.Equals("ThrowException") && ExceptionCount < 5) - { - ExceptionCount++; - throw (new Exception()); - } - - return Task.CompletedTask; - } - - private Task HandleMessageType2EventAsync(IDomainEventEnvelope @event) - { - _logger.LogDebug("Got message MessageType2Event with id={} and value={}", @event.EventId, - @event.Event.ToString()); - EventStatistics eventStatistics = GetEventStatistics(typeof(TestMessageType2)); - HandleTestMessageEvent(@event, eventStatistics); - return Task.CompletedTask; - } - - private Task HandleMessageType3EventAsync(IDomainEventEnvelope @event, - IServiceProvider serviceProvider) - { - _logger.LogDebug("Got message MessageType3Event with id={} and value={}", @event.EventId, - @event.Event.ToString()); - EventStatistics eventStatistics = GetEventStatistics(typeof(TestMessageType3)); - HandleTestMessageEvent(@event, eventStatistics); - return Task.CompletedTask; - } - - private async Task HandleMessageTypeDelayEventAsync(IDomainEventEnvelope @event, - IServiceProvider serviceProvider, CancellationToken cancellationToken) - { - cancellationToken = DelayHandlerCancels ? cancellationToken : CancellationToken.None; - _logger.LogDebug("Got message TestMessageTypeDelay with id={} and value={}", @event.EventId, - @event.Event.ToString()); - await File.AppendAllTextAsync(IntegrationTestsBase.PingFileName, $"Received event '{@event.Message.Payload}' with value '{@event.Event.Value}'\n", cancellationToken); - EventStatistics eventStatistics = GetEventStatistics(typeof(TestMessageTypeDelay)); - HandleTestMessageEvent(@event, eventStatistics); - await Task.Delay(MessageType3ProcessingDelay, cancellationToken); - await File.AppendAllTextAsync(IntegrationTestsBase.PingFileName, $"Processed event '{@event.Message.Payload}' with value '{@event.Event.Value}'\n", cancellationToken); - } - - public void HandleTestMessageEvent(IDomainEventEnvelope @event, EventStatistics eventStatistics) - { - DateTime receivedTime = DateTime.Now; - if (receivedTime < eventStatistics.FirstMessageTime) - eventStatistics.FirstMessageTime = receivedTime; - if (receivedTime > eventStatistics.LastMessageTime) - eventStatistics.LastMessageTime = receivedTime; - eventStatistics.MessageCount++; - eventStatistics.ReceivedMessages.Add(@event); - } - - } -} + public class EventStatistics + { + public int MessageCount { get; set; } + public DateTime FirstMessageTime { get; set; } + public DateTime LastMessageTime { get; set; } + public List> ReceivedMessages { get; set; } + + public TimeSpan GetDuration() + { + return LastMessageTime > FirstMessageTime + ? LastMessageTime - FirstMessageTime + : TimeSpan.Zero; + } + } + + private readonly Dictionary _statisticsForEvent; + public int ExceptionCount; + + public bool DelayHandlerCancels { get; set; } + + public TestEventConsumer(ILogger logger) + { + _logger = logger; + _statisticsForEvent = new Dictionary + { + { typeof(TestMessageType1), new EventStatistics() }, + { typeof(TestMessageType2), new EventStatistics() }, + { typeof(TestMessageType3), new EventStatistics() }, + { typeof(TestMessageType4), new EventStatistics() }, + { typeof(TestMessageTypeDelay), new EventStatistics() } + }; + } + + public EventStatistics GetEventStatistics(Type eventType) + { + return _statisticsForEvent[eventType]; + } + + public IEnumerable GetEventTypes() + { + return _statisticsForEvent.Keys; + } + + public void Reset() + { + foreach (EventStatistics eventStatistics in _statisticsForEvent.Values) + { + eventStatistics.MessageCount = 0; + eventStatistics.FirstMessageTime = DateTime.MaxValue; + eventStatistics.LastMessageTime = DateTime.MinValue; + eventStatistics.ReceivedMessages = new List>(); + } + + ExceptionCount = 0; + DelayHandlerCancels = false; + } + + public int TotalMessageCount() + { + return _statisticsForEvent.Aggregate(0, (total, stat) => + total + stat.Value.MessageCount); + } + + public DomainEventHandlers DomainEventHandlers(String aggregateType12, + String aggregateType34, String aggregateTypeDelay) + { + return DomainEventHandlersBuilder.ForAggregateType(aggregateType12) + .OnEvent(HandleMessageType1EventAsync) + .OnEvent(HandleMessageType2EventAsync) + .AndForAggregateType(aggregateType34) + .OnEvent(HandleMessageType3EventAsync) + .OnEvent() + .AndForAggregateType(aggregateTypeDelay) + .OnEvent(HandleMessageTypeDelayEventAsync) + .Build(); + } + + private Task HandleMessageType1EventAsync(IDomainEventEnvelope @event) + { + _logger.LogDebug("Got MessageType1Event with id={} and value={}", @event.EventId, + @event.Event.ToString()); + EventStatistics eventStatistics = GetEventStatistics(typeof(TestMessageType1)); + HandleTestMessageEvent(@event, eventStatistics); + if (@event.Event.Name.Equals("ThrowException") && ExceptionCount < 5) + { + ExceptionCount++; + throw (new Exception()); + } + + return Task.CompletedTask; + } + + private Task HandleMessageType2EventAsync(IDomainEventEnvelope @event) + { + _logger.LogDebug("Got message MessageType2Event with id={} and value={}", @event.EventId, + @event.Event.ToString()); + EventStatistics eventStatistics = GetEventStatistics(typeof(TestMessageType2)); + HandleTestMessageEvent(@event, eventStatistics); + return Task.CompletedTask; + } + + private Task HandleMessageType3EventAsync(IDomainEventEnvelope @event, + IServiceProvider serviceProvider) + { + _logger.LogDebug("Got message MessageType3Event with id={} and value={}", @event.EventId, + @event.Event.ToString()); + EventStatistics eventStatistics = GetEventStatistics(typeof(TestMessageType3)); + HandleTestMessageEvent(@event, eventStatistics); + return Task.CompletedTask; + } + + private async Task HandleMessageTypeDelayEventAsync(IDomainEventEnvelope @event, + IServiceProvider serviceProvider, CancellationToken cancellationToken) + { + cancellationToken = DelayHandlerCancels ? cancellationToken : CancellationToken.None; + _logger.LogDebug("Got message TestMessageTypeDelay with id={} and value={}", @event.EventId, + @event.Event.ToString()); + await File.AppendAllTextAsync(IntegrationTestsBase.PingFileName, + $"Received event '{@event.Message.Payload}' with value '{@event.Event.Value}'\n", cancellationToken); + EventStatistics eventStatistics = GetEventStatistics(typeof(TestMessageTypeDelay)); + HandleTestMessageEvent(@event, eventStatistics); + await Task.Delay(MessageType3ProcessingDelay, cancellationToken); + await File.AppendAllTextAsync(IntegrationTestsBase.PingFileName, + $"Processed event '{@event.Message.Payload}' with value '{@event.Event.Value}'\n", cancellationToken); + } + + public void HandleTestMessageEvent(IDomainEventEnvelope @event, EventStatistics eventStatistics) + { + DateTime receivedTime = DateTime.Now; + if (receivedTime < eventStatistics.FirstMessageTime) + eventStatistics.FirstMessageTime = receivedTime; + if (receivedTime > eventStatistics.LastMessageTime) + eventStatistics.LastMessageTime = receivedTime; + eventStatistics.MessageCount++; + eventStatistics.ReceivedMessages.Add(@event); + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestHostBuilder.cs b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestHostBuilder.cs index ceb0870..15e1f5f 100644 --- a/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestHostBuilder.cs +++ b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestHostBuilder.cs @@ -10,89 +10,90 @@ namespace IO.Eventuate.Tram.IntegrationTests.TestHelpers { - public class TestHostBuilder - { - private String _sqlConnectionString; - private String _eventuateDatabaseSchemaName; - private String _kafkaBootstrapServers; - private String _subscriberId; - private Func _domainEventHandlersFactory; - private EventuateKafkaConsumerConfigurationProperties _consumerConfigProperties = EventuateKafkaConsumerConfigurationProperties.Empty(); + public class TestHostBuilder + { + private String _sqlConnectionString; + private String _eventuateDatabaseSchemaName; + private String _kafkaBootstrapServers; + private String _subscriberId; + private Func _domainEventHandlersFactory; - private IHost _host; + private EventuateKafkaConsumerConfigurationProperties _consumerConfigProperties = + EventuateKafkaConsumerConfigurationProperties.Empty(); - public TestHostBuilder SetConnectionString(String sqlConnectionString) - { - _sqlConnectionString = sqlConnectionString; - return this; - } + private IHost _host; - public TestHostBuilder SetEventuateDatabaseSchemaName(String eventuateDatabaseSchemaName) - { - _eventuateDatabaseSchemaName = eventuateDatabaseSchemaName; - return this; - } + public TestHostBuilder SetConnectionString(String sqlConnectionString) + { + _sqlConnectionString = sqlConnectionString; + return this; + } - public TestHostBuilder SetKafkaBootstrapServers(String kafkaBootstrapServers) - { - _kafkaBootstrapServers = kafkaBootstrapServers; - return this; - } + public TestHostBuilder SetEventuateDatabaseSchemaName(String eventuateDatabaseSchemaName) + { + _eventuateDatabaseSchemaName = eventuateDatabaseSchemaName; + return this; + } - public TestHostBuilder SetSubscriberId(String subscriberId) - { - _subscriberId = subscriberId; - return this; - } + public TestHostBuilder SetKafkaBootstrapServers(String kafkaBootstrapServers) + { + _kafkaBootstrapServers = kafkaBootstrapServers; + return this; + } - public TestHostBuilder SetDomainEventHandlersFactory( - Func domainEventHandlersFactory) - { - _domainEventHandlersFactory = domainEventHandlersFactory; - return this; - } + public TestHostBuilder SetSubscriberId(String subscriberId) + { + _subscriberId = subscriberId; + return this; + } - public TestHostBuilder SetConsumerConfigProperties(EventuateKafkaConsumerConfigurationProperties consumerConfigProperties) - { - _consumerConfigProperties = consumerConfigProperties; - return this; - } + public TestHostBuilder SetDomainEventHandlersFactory( + Func domainEventHandlersFactory) + { + _domainEventHandlersFactory = domainEventHandlersFactory; + return this; + } - - public IHost Build(bool withInterceptor) where TConsumerType : class - { - _host = new HostBuilder() - .UseDefaultServiceProvider(options => options.ValidateScopes = true) - .ConfigureServices((hostContext, services) => - { - services.AddDbContext((provider, o) => - { - o.UseSqlServer(_sqlConnectionString) - // Use a model cache key factory that ensures a new model is created if EventuateSchema is changed - .ReplaceService(); - }); - services.AddEventuateTramSqlKafkaTransport(_eventuateDatabaseSchemaName, _kafkaBootstrapServers, _consumerConfigProperties, - (provider, o) => - { - o.UseSqlServer(_sqlConnectionString); - }); - if (withInterceptor) - { - services.AddSingleton(); - } + public TestHostBuilder SetConsumerConfigProperties( + EventuateKafkaConsumerConfigurationProperties consumerConfigProperties) + { + _consumerConfigProperties = consumerConfigProperties; + return this; + } - // Publisher Setup - services.AddEventuateTramEventsPublisher(); - // Consumer Setup - services.AddSingleton(); - services.AddEventuateTramDomainEventDispatcher(_subscriberId, _domainEventHandlersFactory); - services.AddSingleton(); + public IHost Build(bool withInterceptor) where TConsumerType : class + { + _host = new HostBuilder() + .UseDefaultServiceProvider(options => options.ValidateScopes = true) + .ConfigureServices((hostContext, services) => + { + services.AddDbContext((provider, o) => + { + o.UseSqlServer(_sqlConnectionString) + // Use a model cache key factory that ensures a new model is created if EventuateSchema is changed + .ReplaceService(); + }); + services.AddEventuateTramSqlKafkaTransport(_eventuateDatabaseSchemaName, _kafkaBootstrapServers, + _consumerConfigProperties, + (provider, o) => { o.UseSqlServer(_sqlConnectionString); }); + if (withInterceptor) + { + services.AddSingleton(); + } - services.Configure((options) => options.ShutdownTimeout = TimeSpan.FromSeconds(30)); - }) - .Build(); - return _host; - } - } -} + // Publisher Setup + services.AddEventuateTramEventsPublisher(); + + // Consumer Setup + services.AddSingleton(); + services.AddEventuateTramDomainEventDispatcher(_subscriberId, _domainEventHandlersFactory); + services.AddSingleton(); + + services.Configure((options) => options.ShutdownTimeout = TimeSpan.FromSeconds(30)); + }) + .Build(); + return _host; + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageInterceptor.cs b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageInterceptor.cs index 2c8ee32..54a83bb 100644 --- a/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageInterceptor.cs +++ b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageInterceptor.cs @@ -5,79 +5,86 @@ namespace IO.Eventuate.Tram.IntegrationTests.TestHelpers { - public class TestMessageInterceptor : IMessageInterceptor - { - public int PreSendCount; - public int PostSendCount; - public int PreReceiveCount; - public int PreHandleCount; - public int PostHandleCount; - public int PostReceiveCount; + public class TestMessageInterceptor : IMessageInterceptor + { + public int PreSendCount; + public int PostSendCount; + public int PreReceiveCount; + public int PreHandleCount; + public int PostHandleCount; + public int PostReceiveCount; - public void Reset() - { - PreSendCount = 0; - PostSendCount = 0; - PreReceiveCount = 0; - PostReceiveCount = 0; - PreHandleCount = 0; - PostHandleCount = 0; - } + public void Reset() + { + PreSendCount = 0; + PostSendCount = 0; + PreReceiveCount = 0; + PostReceiveCount = 0; + PreHandleCount = 0; + PostHandleCount = 0; + } - public void AssertCounts(int preSend, int postSend, int preReceive, int postReceive, int preHandle, int postHandle) - { - Assert.AreEqual(preSend, PreSendCount, $"Message Interceptor PreSendCount value should be {preSend}"); - Assert.AreEqual(postSend, PostSendCount, $"Message Interceptor PostSendCount value should be {postSend}"); - Assert.AreEqual(preReceive, PreReceiveCount, $"Message Interceptor PreReceiveCount value should be {preReceive}"); - Assert.AreEqual(postReceive, PostReceiveCount, $"Message Interceptor PostReceiveCount value should be {postReceive}"); - Assert.AreEqual(preHandle, PreHandleCount, $"Message Interceptor PreHandleCount value should be {preHandle}"); - Assert.AreEqual(postHandle, PostHandleCount, $"Message Interceptor PostHandleCount value should be {postHandle}"); - } + public void AssertCounts(int preSend, int postSend, int preReceive, int postReceive, int preHandle, + int postHandle) + { + Assert.That(PreSendCount, Is.EqualTo(preSend), + $"Message Interceptor PreSendCount value should be {preSend}"); + Assert.That(PostSendCount, Is.EqualTo(postSend), + $"Message Interceptor PostSendCount value should be {postSend}"); + Assert.That(PreReceiveCount, Is.EqualTo(preReceive), + $"Message Interceptor PreReceiveCount value should be {preReceive}"); + Assert.That(PostReceiveCount, Is.EqualTo(postReceive), + $"Message Interceptor PostReceiveCount value should be {postReceive}"); + Assert.That(PreHandleCount, Is.EqualTo(preHandle), + $"Message Interceptor PreHandleCount value should be {preHandle}"); + Assert.That(PostHandleCount, Is.EqualTo(postHandle), + $"Message Interceptor PostHandleCount value should be {postHandle}"); + } - public void PreSend(IMessage message) - { - PreSendCount++; - } + public void PreSend(IMessage message) + { + PreSendCount++; + } - public Task PreSendAsync(IMessage message) - { - PreSend(message); - return Task.CompletedTask; - } + public Task PreSendAsync(IMessage message) + { + PreSend(message); + return Task.CompletedTask; + } - public void PostSend(IMessage message, Exception e) - { - PostSendCount++; - } + public void PostSend(IMessage message, Exception e) + { + PostSendCount++; + } - public Task PostSendAsync(IMessage message, Exception e) - { - PostSend(message, e); - return Task.CompletedTask; - } + public Task PostSendAsync(IMessage message, Exception e) + { + PostSend(message, e); + return Task.CompletedTask; + } - public Task PreReceiveAsync(IMessage message) - { - PreReceiveCount++; - return Task.CompletedTask; - } + public Task PreReceiveAsync(IMessage message) + { + PreReceiveCount++; + return Task.CompletedTask; + } - public Task PreHandleAsync(string subscriberId, IMessage message) - { - PreHandleCount++; - return Task.CompletedTask; - } + public Task PreHandleAsync(string subscriberId, IMessage message) + { + PreHandleCount++; + return Task.CompletedTask; + } - public Task PostHandleAsync(string subscriberId, IMessage message, Exception e) - { - PostHandleCount++; - return Task.CompletedTask; - } + public Task PostHandleAsync(string subscriberId, IMessage message, Exception e) + { + PostHandleCount++; + return Task.CompletedTask; + } - public Task PostReceiveAsync(IMessage message) - { - PostReceiveCount++; - return Task.CompletedTask; - } - } -} + public Task PostReceiveAsync(IMessage message) + { + PostReceiveCount++; + return Task.CompletedTask; + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageType1.cs b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageType1.cs index 78ef8d2..a3eae17 100644 --- a/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageType1.cs +++ b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageType1.cs @@ -6,26 +6,26 @@ namespace IO.Eventuate.Tram.IntegrationTests.TestHelpers { - public class TestMessageType1 : IDomainEvent - { - public String Name { get; set; } - public int Value { get; set; } - public double Number { get; set; } + public class TestMessageType1 : IDomainEvent + { + public String Name { get; set; } + public int Value { get; set; } + public double Number { get; set; } - public TestMessageType1(String name, int value, double number) - { - Name = name; - Value = value; - Number = number; - } + public TestMessageType1(String name, int value, double number) + { + Name = name; + Value = value; + Number = number; + } - public void AssertGoodMessageReceived(IDomainEventEnvelope receivedMessage) - { - Assert.True(receivedMessage.Message.HasHeader(MessageHeaders.Id), "Message ID is in the header"); - TestMessageType1 @event = (TestMessageType1)receivedMessage.Event; - Assert.AreEqual(Name, @event.Name, "Message Name is the same"); - Assert.AreEqual(Value, @event.Value, "Message Value is the same"); - Assert.AreEqual(Number, @event.Number, "Message Number is the same"); - } - } -} + public void AssertGoodMessageReceived(IDomainEventEnvelope receivedMessage) + { + Assert.That(receivedMessage.Message.HasHeader(MessageHeaders.Id), "Message ID is in the header"); + TestMessageType1 @event = (TestMessageType1)receivedMessage.Event; + Assert.That(@event.Name, Is.EqualTo(Name), "Message Name is the same"); + Assert.That(@event.Value, Is.EqualTo(Value), "Message Value is the same"); + Assert.That(@event.Number, Is.EqualTo(Number), "Message Number is the same"); + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageType2.cs b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageType2.cs index 0db5938..49bc04b 100644 --- a/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageType2.cs +++ b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageType2.cs @@ -6,25 +6,26 @@ namespace IO.Eventuate.Tram.IntegrationTests.TestHelpers { - [EventType(EventTypeName)] - public class TestMessageType2 : IDomainEvent - { - public const string EventTypeName = "testing.TestMessageType2"; - - public String Name { get; set; } - public int Value { get; set; } - public TestMessageType2(String name, int value) - { - Name = name; - Value = value; - } + [EventType(EventTypeName)] + public class TestMessageType2 : IDomainEvent + { + public const string EventTypeName = "testing.TestMessageType2"; - public void AssertGoodMessageReceived(IDomainEventEnvelope receivedMessage) - { - Assert.True(receivedMessage.Message.HasHeader(MessageHeaders.Id), "Message ID is in the header"); - TestMessageType2 @event = (TestMessageType2) receivedMessage.Event; - Assert.AreEqual(Name, @event.Name, "Message Name is the same"); - Assert.AreEqual(Value, @event.Value, "Message Value is the same"); - } - } -} + public String Name { get; set; } + public int Value { get; set; } + + public TestMessageType2(String name, int value) + { + Name = name; + Value = value; + } + + public void AssertGoodMessageReceived(IDomainEventEnvelope receivedMessage) + { + Assert.That(receivedMessage.Message.HasHeader(MessageHeaders.Id), "Message ID is in the header"); + TestMessageType2 @event = (TestMessageType2)receivedMessage.Event; + Assert.That(@event.Name, Is.EqualTo(Name), "Message Name is the same"); + Assert.That(@event.Value, Is.EqualTo(Value), "Message Value is the same"); + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageType3.cs b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageType3.cs index 9605f7c..d8854d5 100644 --- a/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageType3.cs +++ b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageType3.cs @@ -3,14 +3,14 @@ namespace IO.Eventuate.Tram.IntegrationTests.TestHelpers { - [EventType(EventTypeName)] - public class TestMessageType3 : TestMessageType2 - { - public new const string EventTypeName = "testing.TestMessageType3"; - - public TestMessageType3(String name, int value) - : base(name, value) - { - } - } -} + [EventType(EventTypeName)] + public class TestMessageType3 : TestMessageType2 + { + public new const string EventTypeName = "testing.TestMessageType3"; + + public TestMessageType3(String name, int value) + : base(name, value) + { + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageType4.cs b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageType4.cs index b6dbba0..aef0e76 100644 --- a/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageType4.cs +++ b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageType4.cs @@ -11,14 +11,14 @@ namespace IO.Eventuate.Tram.IntegrationTests.TestHelpers { - [EventType(EventTypeName)] - public class TestMessageType4 : TestMessageType2 - { - public new const string EventTypeName = "testing.TestMessageType4"; - - public TestMessageType4(String name, int value) - : base(name, value) - { - } - } -} + [EventType(EventTypeName)] + public class TestMessageType4 : TestMessageType2 + { + public new const string EventTypeName = "testing.TestMessageType4"; + + public TestMessageType4(String name, int value) + : base(name, value) + { + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageTypeDelay.cs b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageTypeDelay.cs index 98fa8f4..7eb629a 100644 --- a/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageTypeDelay.cs +++ b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageTypeDelay.cs @@ -11,14 +11,14 @@ namespace IO.Eventuate.Tram.IntegrationTests.TestHelpers { - [EventType(EventTypeName)] - public class TestMessageTypeDelay : TestMessageType2 - { - public new const string EventTypeName = "testing.TestMessageTypeDelay"; - - public TestMessageTypeDelay(String name, int value) - : base(name, value) - { - } - } -} + [EventType(EventTypeName)] + public class TestMessageTypeDelay : TestMessageType2 + { + public new const string EventTypeName = "testing.TestMessageTypeDelay"; + + public TestMessageTypeDelay(String name, int value) + : base(name, value) + { + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageUnsubscribedType.cs b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageUnsubscribedType.cs index 9f0f77d..e628141 100644 --- a/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageUnsubscribedType.cs +++ b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestMessageUnsubscribedType.cs @@ -11,12 +11,13 @@ namespace IO.Eventuate.Tram.IntegrationTests.TestHelpers { - public class TestMessageUnsubscribedType : IDomainEvent - { - public String Name { get; set; } - public TestMessageUnsubscribedType(String name) - { - Name = name; - } - } -} + public class TestMessageUnsubscribedType : IDomainEvent + { + public String Name { get; set; } + + public TestMessageUnsubscribedType(String name) + { + Name = name; + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram.IntegrationTests/TestSettings.cs b/IO.Eventuate.Tram.IntegrationTests/TestSettings.cs index a8703ea..b1103ee 100644 --- a/IO.Eventuate.Tram.IntegrationTests/TestSettings.cs +++ b/IO.Eventuate.Tram.IntegrationTests/TestSettings.cs @@ -5,21 +5,22 @@ namespace IO.Eventuate.Tram.IntegrationTests /// public class TestSettings { - public string KafkaBootstrapServers { get; set; } - /// - /// Database connection strings - /// - public ConnectionStrings ConnectionStrings { get; set; } = new ConnectionStrings(); + public string KafkaBootstrapServers { get; set; } + + /// + /// Database connection strings + /// + public ConnectionStrings ConnectionStrings { get; set; } = new ConnectionStrings(); } - + /// /// Set of database connections /// public class ConnectionStrings { - /// - /// Eventuate Tram database connection string - /// - public string EventuateTramDbConnection { get; set; } + /// + /// Eventuate Tram database connection string + /// + public string EventuateTramDbConnection { get; set; } } } \ No newline at end of file diff --git a/IO.Eventuate.Tram.IntegrationTests/docker-compose.yml b/IO.Eventuate.Tram.IntegrationTests/docker-compose.yml index 9edc25c..a4ff969 100644 --- a/IO.Eventuate.Tram.IntegrationTests/docker-compose.yml +++ b/IO.Eventuate.Tram.IntegrationTests/docker-compose.yml @@ -16,7 +16,7 @@ services: working_dir: /scripts entrypoint: [ "bash", "./entrypoint.sh" ] eventuatetramtests: - image: mcr.microsoft.com/dotnet/sdk:6.0 + image: mcr.microsoft.com/dotnet/sdk:8.0 depends_on: - zookeeper - kafka @@ -24,9 +24,9 @@ services: - cdcservice environment: KafkaBootstrapServers: "kafka:29092" - ConnectionStrings__EventuateTramDbConnection: "Server=mssql;Database=TramDb;User Id=sa;Password=TestPa55word" + ConnectionStrings__EventuateTramDbConnection: "Server=mssql;Database=TramDb;User Id=sa;Password=TestPa55word;TrustServerCertificate=True" volumes: - - ./bin/Release/net6.0:/app + - ./bin/Release/net8.0:/app working_dir: /app entrypoint: [ "dotnet", "test", "IO.Eventuate.Tram.IntegrationTests.dll", "--verbosity", "normal", "--logger", "trx" ] zookeeper: diff --git a/IO.Eventuate.Tram.IntegrationTests/testsettings.json b/IO.Eventuate.Tram.IntegrationTests/testsettings.json index 8f17f42..74017cc 100644 --- a/IO.Eventuate.Tram.IntegrationTests/testsettings.json +++ b/IO.Eventuate.Tram.IntegrationTests/testsettings.json @@ -1,6 +1,6 @@ { "KafkaBootstrapServers": "localhost:9092", "ConnectionStrings": { - "EventuateTramDbConnection": "Server=localhost,1433;Database=TramDb;User Id=sa;Password=TestPa55word" + "EventuateTramDbConnection": "Server=localhost,1433;Database=TramDb;User Id=sa;Password=TestPa55word;TrustServerCertificate=True" } } \ No newline at end of file diff --git a/IO.Eventuate.Tram.UnitTests/Events/Subscriber/DomainEventDispatcherTests.cs b/IO.Eventuate.Tram.UnitTests/Events/Subscriber/DomainEventDispatcherTests.cs index 3711519..2d39696 100644 --- a/IO.Eventuate.Tram.UnitTests/Events/Subscriber/DomainEventDispatcherTests.cs +++ b/IO.Eventuate.Tram.UnitTests/Events/Subscriber/DomainEventDispatcherTests.cs @@ -14,66 +14,65 @@ namespace IO.Eventuate.Tram.UnitTests.Events.Subscriber { - public class DomainEventDispatcherTests - { - private const String SubscriberId = "123ABC"; - private const String AggregateType = "AggregateType"; + public class DomainEventDispatcherTests + { + private const String SubscriberId = "123ABC"; + private const String AggregateType = "AggregateType"; - private const String AggregateId = "xyz"; - private readonly String _messageId = "message-" + DateTime.Now; + private const String AggregateId = "xyz"; + private readonly String _messageId = "message-" + DateTime.Now; - private class MyTarget - { - public readonly ConcurrentQueue> Queue = new(); + private class MyTarget + { + public readonly ConcurrentQueue> Queue = new(); - public DomainEventHandlers DomainEventHandlers() - { - return DomainEventHandlersBuilder - .ForAggregateType(AggregateType) - .OnEvent(HandleAccountDebitedAsync) - .Build(); - } + public DomainEventHandlers DomainEventHandlers() + { + return DomainEventHandlersBuilder + .ForAggregateType(AggregateType) + .OnEvent(HandleAccountDebitedAsync) + .Build(); + } - private Task HandleAccountDebitedAsync(IDomainEventEnvelope message) - { - Queue.Enqueue(message); - return Task.CompletedTask; - } + private Task HandleAccountDebitedAsync(IDomainEventEnvelope message) + { + Queue.Enqueue(message); + return Task.CompletedTask; + } + } - } + private class MyDomainEvent : IDomainEvent + { + } - private class MyDomainEvent : IDomainEvent - { - } - - [Test] - public async Task MessageHandler_ValidMessage_RegisteredHandlerCalled() - { + [Test] + public async Task MessageHandler_ValidMessage_RegisteredHandlerCalled() + { // Arrange - MyTarget target = new MyTarget(); + MyTarget target = new MyTarget(); - var messageConsumer = Substitute.For(); - var serviceProvider = Substitute.For(); - var logger = Substitute.For>(); - var eventTypeNamingStrategy = Substitute.For(); - eventTypeNamingStrategy.GetEventTypeName(typeof(MyDomainEvent)).Returns(typeof(MyDomainEvent).FullName); + var messageConsumer = Substitute.For(); + var serviceProvider = Substitute.For(); + var logger = Substitute.For>(); + var eventTypeNamingStrategy = Substitute.For(); + eventTypeNamingStrategy.GetEventTypeName(typeof(MyDomainEvent)).Returns(typeof(MyDomainEvent).FullName); - DomainEventDispatcher dispatcher = new DomainEventDispatcher( - SubscriberId, target.DomainEventHandlers(), messageConsumer, eventTypeNamingStrategy, logger); + DomainEventDispatcher dispatcher = new DomainEventDispatcher( + SubscriberId, target.DomainEventHandlers(), messageConsumer, eventTypeNamingStrategy, logger); - await dispatcher.InitializeAsync(); + await dispatcher.InitializeAsync(); // Act - await dispatcher.MessageHandlerAsync(DomainEventPublisher.MakeMessageForDomainEvent(AggregateType, - AggregateId, new Dictionary {{ MessageHeaders.Id, _messageId } }, - new MyDomainEvent(), eventTypeNamingStrategy), serviceProvider, CancellationToken.None); + await dispatcher.MessageHandlerAsync(DomainEventPublisher.MakeMessageForDomainEvent(AggregateType, + AggregateId, new Dictionary { { MessageHeaders.Id, _messageId } }, + new MyDomainEvent(), eventTypeNamingStrategy), serviceProvider, CancellationToken.None); // Assert - Assert.True(target.Queue.TryPeek(out var dee)); - Assert.NotNull(dee); - Assert.AreEqual(AggregateId, dee.AggregateId); - Assert.AreEqual(AggregateType, dee.AggregateType); - Assert.AreEqual(_messageId, dee.EventId); - } - } -} + Assert.That(target.Queue.TryPeek(out var dee)); + Assert.That(dee, Is.Not.Null); + Assert.That(dee.AggregateId, Is.EqualTo(AggregateId)); + Assert.That(dee.AggregateType, Is.EqualTo(AggregateType)); + Assert.That(dee.EventId, Is.EqualTo(_messageId)); + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram.UnitTests/IO.Eventuate.Tram.UnitTests.csproj b/IO.Eventuate.Tram.UnitTests/IO.Eventuate.Tram.UnitTests.csproj index 0880cb7..43aa470 100644 --- a/IO.Eventuate.Tram.UnitTests/IO.Eventuate.Tram.UnitTests.csproj +++ b/IO.Eventuate.Tram.UnitTests/IO.Eventuate.Tram.UnitTests.csproj @@ -1,15 +1,15 @@ - net6.0 + net8.0 false - - - + + + diff --git a/IO.Eventuate.Tram.UnitTests/Messaging/Producer/AbstractMessageProducerTests.cs b/IO.Eventuate.Tram.UnitTests/Messaging/Producer/AbstractMessageProducerTests.cs index 3367193..955361f 100644 --- a/IO.Eventuate.Tram.UnitTests/Messaging/Producer/AbstractMessageProducerTests.cs +++ b/IO.Eventuate.Tram.UnitTests/Messaging/Producer/AbstractMessageProducerTests.cs @@ -7,45 +7,41 @@ namespace IO.Eventuate.Tram.UnitTests.Messaging.Producer { - public class AbstractMessageProducerTests - { - - public class MyMessageProducer : AbstractMessageProducer - { - public MyMessageProducer(IEnumerable messageInterceptors, - ILogger logger) - : base(messageInterceptors, logger) - { - - } - - public void Send(string destination, IMessage message, IMessageSender ms) - { - SendMessage("id", destination, message, ms); - } - - - } - - [Test] - public void Send_SimpleMessage_MessageHeadersAreApplied() - { + public class AbstractMessageProducerTests + { + public class MyMessageProducer : AbstractMessageProducer + { + public MyMessageProducer(IEnumerable messageInterceptors, + ILogger logger) + : base(messageInterceptors, logger) + { + } + + public void Send(string destination, IMessage message, IMessageSender ms) + { + SendMessage("id", destination, message, ms); + } + } + + [Test] + public void Send_SimpleMessage_MessageHeadersAreApplied() + { // Arrange - Message sendMessage = null; + Message sendMessage = null; - var ms = Substitute.For(); - ms.Send(Arg.Do(arg => sendMessage = arg)); + var ms = Substitute.For(); + ms.Send(Arg.Do(arg => sendMessage = arg)); // Act - MyMessageProducer mp = new MyMessageProducer(new List(), - Substitute.For()); - mp.Send("Destination", MessageBuilder.WithPayload("x").Build(), ms); + MyMessageProducer mp = new MyMessageProducer(new List(), + Substitute.For()); + mp.Send("Destination", MessageBuilder.WithPayload("x").Build(), ms); // Assert - Assert.NotNull(sendMessage); - Assert.NotNull(sendMessage.GetRequiredHeader(MessageHeaders.Id)); - Assert.NotNull(sendMessage.GetRequiredHeader(MessageHeaders.Destination)); - Assert.NotNull(sendMessage.GetRequiredHeader(MessageHeaders.Date)); - } - } + Assert.That(sendMessage, Is.Not.Null); + Assert.That(sendMessage.GetRequiredHeader(MessageHeaders.Id), Is.Not.Null); + Assert.That(sendMessage.GetRequiredHeader(MessageHeaders.Destination), Is.Not.Null); + Assert.That(sendMessage.GetRequiredHeader(MessageHeaders.Date), Is.Not.Null); + } + } } \ No newline at end of file diff --git a/IO.Eventuate.Tram.UnitTests/Messaging/Producer/HttpDateHeaderFormatUtilTests.cs b/IO.Eventuate.Tram.UnitTests/Messaging/Producer/HttpDateHeaderFormatUtilTests.cs index a777caa..b6b9cf7 100644 --- a/IO.Eventuate.Tram.UnitTests/Messaging/Producer/HttpDateHeaderFormatUtilTests.cs +++ b/IO.Eventuate.Tram.UnitTests/Messaging/Producer/HttpDateHeaderFormatUtilTests.cs @@ -3,12 +3,12 @@ namespace IO.Eventuate.Tram.UnitTests.Messaging.Producer { - public class HttpDateHeaderFormatUtilTests - { - [Test] - public void NowAsHttpDateString_GetResult_NotNull() - { - Assert.NotNull((HttpDateHeaderFormatUtil.NowAsHttpDateString())); - } - } -} + public class HttpDateHeaderFormatUtilTests + { + [Test] + public void NowAsHttpDateString_GetResult_NotNull() + { + Assert.That(HttpDateHeaderFormatUtil.NowAsHttpDateString(), Is.Not.Null); + } + } +} \ No newline at end of file diff --git a/IO.Eventuate.Tram.UnitTests/Messaging/Producer/MessageBuilderTests.cs b/IO.Eventuate.Tram.UnitTests/Messaging/Producer/MessageBuilderTests.cs index df0cd86..736ff2c 100644 --- a/IO.Eventuate.Tram.UnitTests/Messaging/Producer/MessageBuilderTests.cs +++ b/IO.Eventuate.Tram.UnitTests/Messaging/Producer/MessageBuilderTests.cs @@ -15,8 +15,8 @@ public void WithMessage_MessageWithHeaders_MessageAndHeadersDuplicated() // Act Message sourceMessage = new Message("The payload", new Dictionary { - {"Header1", "HeaderValue" }, - {MessageHeaders.Id, "MyMessage" } + { "Header1", "HeaderValue" }, + { MessageHeaders.Id, "MyMessage" } }); Message builtMessage = MessageBuilder.WithMessage(sourceMessage).Build(); @@ -27,4 +27,4 @@ public void WithMessage_MessageWithHeaders_MessageAndHeadersDuplicated() Assert.That(builtMessage.Headers, Is.EquivalentTo(sourceMessage.Headers), "Headers"); } } -} +} \ No newline at end of file diff --git a/IO.Eventuate.Tram/IO.Eventuate.Tram.csproj b/IO.Eventuate.Tram/IO.Eventuate.Tram.csproj index afa5305..a6c7411 100644 --- a/IO.Eventuate.Tram/IO.Eventuate.Tram.csproj +++ b/IO.Eventuate.Tram/IO.Eventuate.Tram.csproj @@ -1,16 +1,16 @@ - net6.0 + net8.0 0.4.0 https://github.com/eventuate-tram/eventuate-tram-core-dotnet - - - - + + + + diff --git a/README.md b/README.md index 7a61814..562c4cb 100644 --- a/README.md +++ b/README.md @@ -314,7 +314,7 @@ $ export CDC_SERVICE_DOCKER_VERSION= $ ./test.sh ``` -Test results will be written to ./bin/Release/net6.0/TestResults. +Test results will be written to ./bin/Release/net8.0/TestResults. ### Environment Variable Configuration