diff --git a/src/Transports/MQTT/Wolverine.MQTT/MqttTransportExpression.cs b/src/Transports/MQTT/Wolverine.MQTT/MqttTransportExpression.cs
index 6a7e468f1..a058a01cd 100644
--- a/src/Transports/MQTT/Wolverine.MQTT/MqttTransportExpression.cs
+++ b/src/Transports/MQTT/Wolverine.MQTT/MqttTransportExpression.cs
@@ -1,3 +1,6 @@
+using JasperFx.Core.Reflection;
+using Microsoft.Extensions.Options;
+using Wolverine.Configuration;
using Wolverine.MQTT.Internals;
namespace Wolverine.MQTT;
@@ -5,9 +8,71 @@ namespace Wolverine.MQTT;
public class MqttTransportExpression
{
private readonly MqttTransport _transport;
+ private readonly WolverineOptions _options;
- internal MqttTransportExpression(MqttTransport transport)
+ internal MqttTransportExpression(MqttTransport transport, WolverineOptions options)
{
_transport = transport;
+ _options = options;
+ }
+
+ ///
+ /// Apply a policy to all listening endpoints
+ ///
+ ///
+ ///
+ public MqttTransportExpression ConfigureListeners(Action configure)
+ {
+ var policy = new LambdaEndpointPolicy((e, _) =>
+ {
+ if (e.Role == EndpointRole.System)
+ {
+ return;
+ }
+
+ if (!e.IsListener)
+ {
+ return;
+ }
+
+ var configuration = new MqttListenerConfiguration(e);
+ configure(configuration);
+
+ configuration!.As().Apply();
+ });
+
+ _options.Policies.Add(policy);
+
+ return this.As();
+ }
+
+ ///
+ /// Apply a policy to all MQTT subscribers
+ ///
+ ///
+ ///
+ public MqttTransportExpression ConfigureSenders(Action configure)
+ {
+ var policy = new LambdaEndpointPolicy((e, _) =>
+ {
+ if (e.Role == EndpointRole.System)
+ {
+ return;
+ }
+
+ if (!e.Subscriptions.Any())
+ {
+ return;
+ }
+
+ var configuration = new MqttSubscriberConfiguration(e);
+ configure(configuration);
+
+ configuration!.As().Apply();
+ });
+
+ _options.Policies.Add(policy);
+
+ return this.As();
}
}
\ No newline at end of file
diff --git a/src/Transports/MQTT/Wolverine.MQTT/MqttTransportExtensions.cs b/src/Transports/MQTT/Wolverine.MQTT/MqttTransportExtensions.cs
index 54ae3cbe0..808ea22d2 100644
--- a/src/Transports/MQTT/Wolverine.MQTT/MqttTransportExtensions.cs
+++ b/src/Transports/MQTT/Wolverine.MQTT/MqttTransportExtensions.cs
@@ -35,7 +35,7 @@ public static MqttTransportExpression UseMqtt(this WolverineOptions options,
transport.Options = builder.Build();
- return new MqttTransportExpression(transport);
+ return new MqttTransportExpression(transport, options);
}
///
@@ -50,7 +50,7 @@ public static MqttTransportExpression UseMqtt(this WolverineOptions options, Man
transport.Options = mqttOptions;
- return new MqttTransportExpression(transport);
+ return new MqttTransportExpression(transport, options);
}
///