diff --git a/docs/guide/messaging/transports/kafka.md b/docs/guide/messaging/transports/kafka.md new file mode 100644 index 000000000..07faea0ca --- /dev/null +++ b/docs/guide/messaging/transports/kafka.md @@ -0,0 +1,7 @@ +# Using Kafka + + +::: warning +The Kafka transport does not really support the "Requeue" error handling policy in Wolverine. "Requeue" in this case becomes +effectively an inline "Retry" +::: \ No newline at end of file diff --git a/docs/guide/messaging/transports/mqtt.md b/docs/guide/messaging/transports/mqtt.md index 25d6fad96..1cb43596d 100644 --- a/docs/guide/messaging/transports/mqtt.md +++ b/docs/guide/messaging/transports/mqtt.md @@ -57,6 +57,11 @@ using var host = await Host.CreateDefaultBuilder() The MQTT transport *at this time* only supports endpoints that are either `Buffered` or `Durable`. ::: +::: warning +The MQTT transport does not really support the "Requeue" error handling policy in Wolverine. "Requeue" in this case becomes +effectively an inline "Retry" +::: + ## Broadcast to User Defined Topics As long as the MQTT transport is enabled in your application, you can explicitly publish messages to any named topic diff --git a/src/Transports/MQTT/Wolverine.MQTT.Tests/buffered_compliance.cs b/src/Transports/MQTT/Wolverine.MQTT.Tests/buffered_compliance.cs index eba9eebc8..3a1c701d9 100644 --- a/src/Transports/MQTT/Wolverine.MQTT.Tests/buffered_compliance.cs +++ b/src/Transports/MQTT/Wolverine.MQTT.Tests/buffered_compliance.cs @@ -80,4 +80,6 @@ public async Task has_response_topic_automatically() transport.ReplyEndpoint().ShouldBeOfType().TopicName.ShouldBe(transport.ResponseTopic); } -} \ No newline at end of file +} + + diff --git a/src/Transports/MQTT/Wolverine.MQTT.Tests/inline_compliance.cs b/src/Transports/MQTT/Wolverine.MQTT.Tests/inline_compliance.cs new file mode 100644 index 000000000..8c58e40df --- /dev/null +++ b/src/Transports/MQTT/Wolverine.MQTT.Tests/inline_compliance.cs @@ -0,0 +1,79 @@ +using Microsoft.Extensions.DependencyInjection; +using Shouldly; +using TestingSupport; +using TestingSupport.Compliance; +using Wolverine.MQTT.Internals; +using Wolverine.Runtime; + +namespace Wolverine.MQTT.Tests; + +public class InlineComplianceFixture : TransportComplianceFixture, IAsyncLifetime +{ + + public static int Number = 0; + + public InlineComplianceFixture() : base(new Uri("mqtt://topic/receiver"), 120) + { + + } + + public async Task InitializeAsync() + { + var port = PortFinder.GetAvailablePort(); + + var number = ++Number; + var receiverTopic = "receiver-" + number; + var senderTopic = "sender-" + number; + + Broker = new LocalMqttBroker(port) + { + + }; + + await Broker.StartAsync(); + + OutboundAddress = new Uri("mqtt://topic/" + receiverTopic); + + await SenderIs(opts => + { + opts.UseMqttWithLocalBroker(port); + + opts.ListenToMqttTopic(senderTopic).RetainMessages(); + + opts.PublishAllMessages().ToMqttTopic(receiverTopic).RetainMessages().SendInline(); + }); + + await ReceiverIs(opts => + { + opts.UseMqttWithLocalBroker(port); + + opts.ListenToMqttTopic(receiverTopic).Named("receiver").RetainMessages().ProcessInline(); + }); + } + + public LocalMqttBroker Broker { get; private set; } + + public async Task DisposeAsync() + { + await Broker.StopAsync(); + await Broker.DisposeAsync(); + await DisposeAsync(); + } +} + +[Collection("acceptance")] +public class InlineSendingAndReceivingCompliance : TransportCompliance +{ + [Fact] + public void has_response_topic_automatically() + { + var options = theSender.Services.GetRequiredService().Options; + var transport = options.Transports + .GetOrCreate(); + + transport.ResponseTopic.ShouldBe("wolverine/response/" + options.Durability.AssignedNodeNumber); + + transport.ReplyEndpoint().ShouldBeOfType().TopicName.ShouldBe(transport.ResponseTopic); + } + +} \ No newline at end of file diff --git a/src/Transports/MQTT/Wolverine.MQTT/Internals/MqttListener.cs b/src/Transports/MQTT/Wolverine.MQTT/Internals/MqttListener.cs index 499db9473..acef3b891 100644 --- a/src/Transports/MQTT/Wolverine.MQTT/Internals/MqttListener.cs +++ b/src/Transports/MQTT/Wolverine.MQTT/Internals/MqttListener.cs @@ -43,8 +43,8 @@ public MqttListener(MqttTransport broker, ILogger logger, MqttTopic topic, IRece envelope.IsAcked = true; } - // Resend - await _topic.SendAsync(envelope); + // Really just an inline retry + await _receiver.ReceivedAsync(this, envelope); }, logger, _cancellation.Token); } diff --git a/src/Transports/MQTT/Wolverine.MQTT/MqttTopic.cs b/src/Transports/MQTT/Wolverine.MQTT/MqttTopic.cs index 594daeb75..694aa87d3 100644 --- a/src/Transports/MQTT/Wolverine.MQTT/MqttTopic.cs +++ b/src/Transports/MQTT/Wolverine.MQTT/MqttTopic.cs @@ -93,11 +93,6 @@ internal ManagedMqttApplicationMessage BuildMessage(Envelope envelope) return message; } - protected override bool supportsMode(EndpointMode mode) - { - return mode != EndpointMode.Inline; - } - public ValueTask SendAsync(Envelope envelope) { var message = BuildMessage(envelope);