Skip to content

Commit

Permalink
Added Inline endpoint support back into MQTT. Closes GH-603
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremydmiller committed Oct 20, 2023
1 parent e8df04e commit 471affe
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 8 deletions.
7 changes: 7 additions & 0 deletions docs/guide/messaging/transports/kafka.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Using Kafka


::: warning
The Kafka transport does not really support the "Requeue" error handling policy in Wolverine. "Requeue" in this case becomes
effectively an inline "Retry"
:::
5 changes: 5 additions & 0 deletions docs/guide/messaging/transports/mqtt.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ using var host = await Host.CreateDefaultBuilder()
The MQTT transport *at this time* only supports endpoints that are either `Buffered` or `Durable`.
:::

::: warning
The MQTT transport does not really support the "Requeue" error handling policy in Wolverine. "Requeue" in this case becomes
effectively an inline "Retry"
:::

## Broadcast to User Defined Topics

As long as the MQTT transport is enabled in your application, you can explicitly publish messages to any named topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,6 @@ public async Task has_response_topic_automatically()
transport.ReplyEndpoint().ShouldBeOfType<MqttTopic>().TopicName.ShouldBe(transport.ResponseTopic);
}

}
}


79 changes: 79 additions & 0 deletions src/Transports/MQTT/Wolverine.MQTT.Tests/inline_compliance.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
using Microsoft.Extensions.DependencyInjection;
using Shouldly;
using TestingSupport;
using TestingSupport.Compliance;
using Wolverine.MQTT.Internals;
using Wolverine.Runtime;

namespace Wolverine.MQTT.Tests;

public class InlineComplianceFixture : TransportComplianceFixture, IAsyncLifetime
{

public static int Number = 0;

public InlineComplianceFixture() : base(new Uri("mqtt://topic/receiver"), 120)
{

}

public async Task InitializeAsync()
{
var port = PortFinder.GetAvailablePort();

var number = ++Number;
var receiverTopic = "receiver-" + number;
var senderTopic = "sender-" + number;

Broker = new LocalMqttBroker(port)
{

};

await Broker.StartAsync();

OutboundAddress = new Uri("mqtt://topic/" + receiverTopic);

await SenderIs(opts =>
{
opts.UseMqttWithLocalBroker(port);

opts.ListenToMqttTopic(senderTopic).RetainMessages();

opts.PublishAllMessages().ToMqttTopic(receiverTopic).RetainMessages().SendInline();
});

await ReceiverIs(opts =>
{
opts.UseMqttWithLocalBroker(port);

opts.ListenToMqttTopic(receiverTopic).Named("receiver").RetainMessages().ProcessInline();
});
}

public LocalMqttBroker Broker { get; private set; }

public async Task DisposeAsync()
{
await Broker.StopAsync();
await Broker.DisposeAsync();
await DisposeAsync();
}
}

[Collection("acceptance")]
public class InlineSendingAndReceivingCompliance : TransportCompliance<InlineComplianceFixture>
{
[Fact]
public void has_response_topic_automatically()
{
var options = theSender.Services.GetRequiredService<IWolverineRuntime>().Options;
var transport = options.Transports
.GetOrCreate<MqttTransport>();

transport.ResponseTopic.ShouldBe("wolverine/response/" + options.Durability.AssignedNodeNumber);

transport.ReplyEndpoint().ShouldBeOfType<MqttTopic>().TopicName.ShouldBe(transport.ResponseTopic);
}

}
4 changes: 2 additions & 2 deletions src/Transports/MQTT/Wolverine.MQTT/Internals/MqttListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public MqttListener(MqttTransport broker, ILogger logger, MqttTopic topic, IRece
envelope.IsAcked = true;
}

// Resend
await _topic.SendAsync(envelope);
// Really just an inline retry
await _receiver.ReceivedAsync(this, envelope);
}, logger, _cancellation.Token);
}

Expand Down
5 changes: 0 additions & 5 deletions src/Transports/MQTT/Wolverine.MQTT/MqttTopic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,6 @@ internal ManagedMqttApplicationMessage BuildMessage(Envelope envelope)
return message;
}

protected override bool supportsMode(EndpointMode mode)
{
return mode != EndpointMode.Inline;
}

public ValueTask SendAsync(Envelope envelope)
{
var message = BuildMessage(envelope);
Expand Down

0 comments on commit 471affe

Please sign in to comment.