diff --git a/src/SlimMessageBus.Host/MessageBusBase.cs b/src/SlimMessageBus.Host/MessageBusBase.cs index c3f0f98c..af95cb4f 100644 --- a/src/SlimMessageBus.Host/MessageBusBase.cs +++ b/src/SlimMessageBus.Host/MessageBusBase.cs @@ -610,8 +610,14 @@ public virtual Task ProduceResponse(string requestId, object request, IReadOnlyD if (consumerInvoker == null) throw new ArgumentNullException(nameof(consumerInvoker)); var responseType = consumerInvoker.ParentSettings.ResponseType; + if (!requestHeaders.TryGetHeader(ReqRespMessageHeaders.ReplyTo, out object replyTo)) + { + _logger.LogDebug($$"""Skipping sending response {Response} of type {MessageType} as the header {{ReqRespMessageHeaders.ReplyTo}} is missing for RequestId: {RequestId}""", response, responseType, requestId); + return Task.CompletedTask; + } + _logger.LogDebug("Sending the response {Response} of type {MessageType} for RequestId: {RequestId}...", response, responseType, requestId); - + var responseHeaders = CreateHeaders(); responseHeaders.SetHeader(ReqRespMessageHeaders.RequestId, requestId); if (responseException != null) @@ -619,11 +625,6 @@ public virtual Task ProduceResponse(string requestId, object request, IReadOnlyD responseHeaders.SetHeader(ReqRespMessageHeaders.Error, responseException.Message); } - if (!requestHeaders.TryGetHeader(ReqRespMessageHeaders.ReplyTo, out object replyTo)) - { - throw new MessageBusException($"The header {ReqRespMessageHeaders.ReplyTo} was missing on the message"); - } - _headerService.AddMessageTypeHeader(response, responseHeaders); return ProduceToTransport(response, responseType, (string)replyTo, responseHeaders, null); diff --git a/src/Tests/SlimMessageBus.Host.Test/MessageBusBaseTests.cs b/src/Tests/SlimMessageBus.Host.Test/MessageBusBaseTests.cs index 88a29026..7795a37f 100644 --- a/src/Tests/SlimMessageBus.Host.Test/MessageBusBaseTests.cs +++ b/src/Tests/SlimMessageBus.Host.Test/MessageBusBaseTests.cs @@ -1,4 +1,9 @@ namespace SlimMessageBus.Host.Test; + +using System.Threading; + +using Moq.Protected; + using SlimMessageBus.Host.Test.Common; public class MessageBusBaseTests : IDisposable @@ -655,4 +660,50 @@ public async Task When_Stop_Given_ConcurrentCalls_Then_ItOnlyStopsConsumersOnce( bus._startedCount.Should().Be(1); bus._stoppedCount.Should().Be(1); } + + public class ProduceResponseTests + { + [Fact] + public async Task When_Given_NoReplyToHeader_DoNothing() + { + // arrange + var requestId = "req-123"; + var request = new object(); + var response = new object(); + + object value; + var mockRequestHeaders = new Mock>(); + mockRequestHeaders.Setup(x => x.TryGetValue(ReqRespMessageHeaders.ReplyTo, out value)).Returns(false).Verifiable(Times.Once); + + var mockMessageTypeResolver = new Mock(); + + var mockServiceProvider = new Mock(); + mockServiceProvider.Setup(x => x.GetService(typeof(IMessageTypeResolver))).Returns(mockMessageTypeResolver.Object); + + var mockMessageTypeConsumerInvokerSettings = new Mock(); + mockMessageTypeConsumerInvokerSettings.SetupGet(x => x.ParentSettings).Returns(() => new ConsumerSettings() { ResponseType = response.GetType() }); + + var settings = new MessageBusSettings { ServiceProvider = mockServiceProvider.Object }; + + var mockMessageBus = new Mock(settings) { CallBase = true }; + mockMessageBus.Protected().Setup>>( + "ProduceToTransportBulk", + [typeof(BulkMessageEnvelope)], + false, + ItExpr.IsAny>(), + ItExpr.IsAny(), + ItExpr.IsAny(), + ItExpr.IsAny()) + .Verifiable(Times.Never); + + var target = mockMessageBus.Object; + + // act + await target.ProduceResponse(requestId, request, mockRequestHeaders.Object, response, null, mockMessageTypeConsumerInvokerSettings.Object); + + // assert + mockRequestHeaders.VerifyAll(); + mockMessageBus.VerifyAll(); + } + } }