Skip to content

Commit

Permalink
Adding conventional listener and subscriber endpoint configuration to…
Browse files Browse the repository at this point in the history
… MQTT. Closes GH-598
  • Loading branch information
jeremydmiller committed Oct 20, 2023
1 parent 35da768 commit 5d9cf21
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 3 deletions.
67 changes: 66 additions & 1 deletion src/Transports/MQTT/Wolverine.MQTT/MqttTransportExpression.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,78 @@
using JasperFx.Core.Reflection;
using Microsoft.Extensions.Options;
using Wolverine.Configuration;
using Wolverine.MQTT.Internals;

namespace Wolverine.MQTT;

public class MqttTransportExpression
{
private readonly MqttTransport _transport;
private readonly WolverineOptions _options;

internal MqttTransportExpression(MqttTransport transport)
internal MqttTransportExpression(MqttTransport transport, WolverineOptions options)
{
_transport = transport;
_options = options;
}

/// <summary>
/// Apply a policy to all listening endpoints
/// </summary>
/// <param name="configure"></param>
/// <returns></returns>
public MqttTransportExpression ConfigureListeners(Action<MqttListenerConfiguration> configure)
{
var policy = new LambdaEndpointPolicy<MqttTopic>((e, _) =>
{
if (e.Role == EndpointRole.System)
{
return;
}

if (!e.IsListener)
{
return;
}

var configuration = new MqttListenerConfiguration(e);
configure(configuration);

configuration!.As<IDelayedEndpointConfiguration>().Apply();
});

_options.Policies.Add(policy);

return this.As<MqttTransportExpression>();
}

/// <summary>
/// Apply a policy to all MQTT subscribers
/// </summary>
/// <param name="configure"></param>
/// <returns></returns>
public MqttTransportExpression ConfigureSenders(Action<MqttSubscriberConfiguration> configure)
{
var policy = new LambdaEndpointPolicy<MqttTopic>((e, _) =>
{
if (e.Role == EndpointRole.System)
{
return;
}

if (!e.Subscriptions.Any())
{
return;
}

var configuration = new MqttSubscriberConfiguration(e);
configure(configuration);

configuration!.As<IDelayedEndpointConfiguration>().Apply();
});

_options.Policies.Add(policy);

return this.As<MqttTransportExpression>();
}
}
4 changes: 2 additions & 2 deletions src/Transports/MQTT/Wolverine.MQTT/MqttTransportExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public static MqttTransportExpression UseMqtt(this WolverineOptions options,

transport.Options = builder.Build();

return new MqttTransportExpression(transport);
return new MqttTransportExpression(transport, options);
}

/// <summary>
Expand All @@ -50,7 +50,7 @@ public static MqttTransportExpression UseMqtt(this WolverineOptions options, Man

transport.Options = mqttOptions;

return new MqttTransportExpression(transport);
return new MqttTransportExpression(transport, options);
}

/// <summary>
Expand Down

0 comments on commit 5d9cf21

Please sign in to comment.