Skip to content

Commit

Permalink
Test handler canceled
Browse files Browse the repository at this point in the history
US592494
  • Loading branch information
douggish committed Sep 18, 2023
1 parent 4856680 commit 1c352ff
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -348,12 +348,16 @@ public async Task PublishAsync_SingleSubscribedMessageType1_MessageReceived()
}

[Test]
public async Task PublishAsync_TestHostShutdownDuringProcessing_CurrentMessageProcessedAndOtherNextMessageNotStarted()
[TestCase(true)]
[TestCase(false)]
public async Task PublishAsync_TestHostShutdownDuringProcessing_CurrentMessageProcessedOrCanceledAndOtherNextMessageNotStarted(bool handlerCancelsWork)
{
// Arrange
TestEventConsumer testConsumer = GetTestConsumer();
testConsumer.DelayHandlerCancels = handlerCancelsWork;
TestMessageTypeDelay msgA = new TestMessageTypeDelay($"msgA-{Guid.NewGuid()}", 1);
TestMessageTypeDelay msgB = new TestMessageTypeDelay($"msgB-{Guid.NewGuid()}", 2);
TestEventConsumer.EventStatistics eventStatistics = GetTestConsumer().GetEventStatistics(
TestEventConsumer.EventStatistics eventStatistics = testConsumer.GetEventStatistics(
typeof(TestMessageTypeDelay));

// Act - Publish some type 3 messages
Expand All @@ -377,11 +381,16 @@ public async Task PublishAsync_TestHostShutdownDuringProcessing_CurrentMessagePr
// Assert - First message processing started but not completed before shutdown
Assert.That(pingsBeforeShutdown.Length, Is.EqualTo(1));
Assert.That(pingsBeforeShutdown[0], Contains.Substring("Received event").And.Contains(msgA.Name));
// Verify first message processing completed by end of shutdown
Assert.That(pingsAfterShutdown.Length, Is.EqualTo(2));
Assert.That(pingsAfterShutdown[1], Contains.Substring($"Processed event").And.Contains(msgA.Name));
// Verify first message processing completed or not by end of shutdown (shouldn't complete if handlerCancels)
int expectedNumberOfLinesAfterShutdown = handlerCancelsWork ? 1 : 2;
Assert.That(pingsAfterShutdown.Length, Is.EqualTo(expectedNumberOfLinesAfterShutdown));
if (!handlerCancelsWork)
{
Assert.That(pingsAfterShutdown[1], Contains.Substring($"Processed event").And.Contains(msgA.Name));
}

// Verify second message not started after shutdown
Assert.That(delayedPings.Length, Is.EqualTo(2));
Assert.That(delayedPings.Length, Is.EqualTo(expectedNumberOfLinesAfterShutdown));
Assert.That(delayedPings, Is.EquivalentTo(pingsAfterShutdown));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public TimeSpan GetDuration()

private readonly Dictionary<Type, EventStatistics> _statisticsForEvent;
public int ExceptionCount;

public bool DelayHandlerCancels { get; set; }

public TestEventConsumer(ILogger<TestEventConsumer> logger)
{
Expand Down Expand Up @@ -71,6 +73,7 @@ public void Reset()
eventStatistics.ReceivedMessages = new List<IDomainEventEnvelope<IDomainEvent>>();
}
ExceptionCount = 0;
DelayHandlerCancels = false;
}

public int TotalMessageCount()
Expand Down Expand Up @@ -127,17 +130,17 @@ private Task HandleMessageType3EventAsync(IDomainEventEnvelope<TestMessageType3>
return Task.CompletedTask;
}

private Task HandleMessageTypeDelayEventAsync(IDomainEventEnvelope<TestMessageTypeDelay> @event,
IServiceProvider serviceProvider)
private async Task HandleMessageTypeDelayEventAsync(IDomainEventEnvelope<TestMessageTypeDelay> @event,
IServiceProvider serviceProvider, CancellationToken cancellationToken)
{
cancellationToken = DelayHandlerCancels ? cancellationToken : CancellationToken.None;
_logger.LogDebug("Got message TestMessageTypeDelay with id={} and value={}", @event.EventId,
@event.Event.ToString());
File.AppendAllText(IntegrationTestsBase.PingFileName, $"Received event '{@event.Message.Payload}' with value '{@event.Event.Value}'\n");
await File.AppendAllTextAsync(IntegrationTestsBase.PingFileName, $"Received event '{@event.Message.Payload}' with value '{@event.Event.Value}'\n", cancellationToken);
EventStatistics eventStatistics = GetEventStatistics(typeof(TestMessageTypeDelay));
HandleTestMessageEvent(@event, eventStatistics);
Thread.Sleep(MessageType3ProcessingDelay);
File.AppendAllText(IntegrationTestsBase.PingFileName, $"Processed event '{@event.Message.Payload}' with value '{@event.Event.Value}'\n");
return Task.CompletedTask;
await Task.Delay(MessageType3ProcessingDelay, cancellationToken);
await File.AppendAllTextAsync(IntegrationTestsBase.PingFileName, $"Processed event '{@event.Message.Payload}' with value '{@event.Event.Value}'\n", cancellationToken);
}

public void HandleTestMessageEvent(IDomainEventEnvelope<IDomainEvent> @event, EventStatistics eventStatistics)
Expand Down

0 comments on commit 1c352ff

Please sign in to comment.