diff --git a/IO.Eventuate.Tram.IntegrationTests/TestFixtures/ProducerConsumerIntegrationTests.cs b/IO.Eventuate.Tram.IntegrationTests/TestFixtures/ProducerConsumerIntegrationTests.cs index 35b29a1..b91fa19 100644 --- a/IO.Eventuate.Tram.IntegrationTests/TestFixtures/ProducerConsumerIntegrationTests.cs +++ b/IO.Eventuate.Tram.IntegrationTests/TestFixtures/ProducerConsumerIntegrationTests.cs @@ -348,12 +348,16 @@ public async Task PublishAsync_SingleSubscribedMessageType1_MessageReceived() } [Test] - public async Task PublishAsync_TestHostShutdownDuringProcessing_CurrentMessageProcessedAndOtherNextMessageNotStarted() + [TestCase(true)] + [TestCase(false)] + public async Task PublishAsync_TestHostShutdownDuringProcessing_CurrentMessageProcessedOrCanceledAndOtherNextMessageNotStarted(bool handlerCancelsWork) { // Arrange + TestEventConsumer testConsumer = GetTestConsumer(); + testConsumer.DelayHandlerCancels = handlerCancelsWork; TestMessageTypeDelay msgA = new TestMessageTypeDelay($"msgA-{Guid.NewGuid()}", 1); TestMessageTypeDelay msgB = new TestMessageTypeDelay($"msgB-{Guid.NewGuid()}", 2); - TestEventConsumer.EventStatistics eventStatistics = GetTestConsumer().GetEventStatistics( + TestEventConsumer.EventStatistics eventStatistics = testConsumer.GetEventStatistics( typeof(TestMessageTypeDelay)); // Act - Publish some type 3 messages @@ -377,11 +381,16 @@ public async Task PublishAsync_TestHostShutdownDuringProcessing_CurrentMessagePr // Assert - First message processing started but not completed before shutdown Assert.That(pingsBeforeShutdown.Length, Is.EqualTo(1)); Assert.That(pingsBeforeShutdown[0], Contains.Substring("Received event").And.Contains(msgA.Name)); - // Verify first message processing completed by end of shutdown - Assert.That(pingsAfterShutdown.Length, Is.EqualTo(2)); - Assert.That(pingsAfterShutdown[1], Contains.Substring($"Processed event").And.Contains(msgA.Name)); + // Verify first message processing completed or not by end of shutdown (shouldn't complete if handlerCancels) + int expectedNumberOfLinesAfterShutdown = handlerCancelsWork ? 1 : 2; + Assert.That(pingsAfterShutdown.Length, Is.EqualTo(expectedNumberOfLinesAfterShutdown)); + if (!handlerCancelsWork) + { + Assert.That(pingsAfterShutdown[1], Contains.Substring($"Processed event").And.Contains(msgA.Name)); + } + // Verify second message not started after shutdown - Assert.That(delayedPings.Length, Is.EqualTo(2)); + Assert.That(delayedPings.Length, Is.EqualTo(expectedNumberOfLinesAfterShutdown)); Assert.That(delayedPings, Is.EquivalentTo(pingsAfterShutdown)); } diff --git a/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestEventConsumer.cs b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestEventConsumer.cs index 89cd0f5..a973a7a 100644 --- a/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestEventConsumer.cs +++ b/IO.Eventuate.Tram.IntegrationTests/TestHelpers/TestEventConsumer.cs @@ -37,6 +37,8 @@ public TimeSpan GetDuration() private readonly Dictionary _statisticsForEvent; public int ExceptionCount; + + public bool DelayHandlerCancels { get; set; } public TestEventConsumer(ILogger logger) { @@ -71,6 +73,7 @@ public void Reset() eventStatistics.ReceivedMessages = new List>(); } ExceptionCount = 0; + DelayHandlerCancels = false; } public int TotalMessageCount() @@ -127,17 +130,17 @@ private Task HandleMessageType3EventAsync(IDomainEventEnvelope return Task.CompletedTask; } - private Task HandleMessageTypeDelayEventAsync(IDomainEventEnvelope @event, - IServiceProvider serviceProvider) + private async Task HandleMessageTypeDelayEventAsync(IDomainEventEnvelope @event, + IServiceProvider serviceProvider, CancellationToken cancellationToken) { + cancellationToken = DelayHandlerCancels ? cancellationToken : CancellationToken.None; _logger.LogDebug("Got message TestMessageTypeDelay with id={} and value={}", @event.EventId, @event.Event.ToString()); - File.AppendAllText(IntegrationTestsBase.PingFileName, $"Received event '{@event.Message.Payload}' with value '{@event.Event.Value}'\n"); + await File.AppendAllTextAsync(IntegrationTestsBase.PingFileName, $"Received event '{@event.Message.Payload}' with value '{@event.Event.Value}'\n", cancellationToken); EventStatistics eventStatistics = GetEventStatistics(typeof(TestMessageTypeDelay)); HandleTestMessageEvent(@event, eventStatistics); - Thread.Sleep(MessageType3ProcessingDelay); - File.AppendAllText(IntegrationTestsBase.PingFileName, $"Processed event '{@event.Message.Payload}' with value '{@event.Event.Value}'\n"); - return Task.CompletedTask; + await Task.Delay(MessageType3ProcessingDelay, cancellationToken); + await File.AppendAllTextAsync(IntegrationTestsBase.PingFileName, $"Processed event '{@event.Message.Payload}' with value '{@event.Event.Value}'\n", cancellationToken); } public void HandleTestMessageEvent(IDomainEventEnvelope @event, EventStatistics eventStatistics)