Skip to content

Commit

Permalink
refactored startup, added serilog
Browse files Browse the repository at this point in the history
  • Loading branch information
janzenisek committed Feb 22, 2024
1 parent ebc7d23 commit 6a7b89b
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 59 deletions.
148 changes: 92 additions & 56 deletions src/Communication/Broker/MqttBroker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,54 @@
using Microsoft.Extensions.Hosting;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Serilog;
using Microsoft.Extensions.Configuration;

// https://blog.behroozbc.ir/c-mqtt-broker-using-mqttnet-version-4
namespace Ai.Hgb.Dat.Communication {
public sealed class MqttBroker : IBroker {
public HostAddress Address {
get { return address; }
private set { if (!value.Equals(address)) address = value; }
set { address = value; }
}
public int WebsocketPort {
get { return websocketPort; }
set { websocketPort = value; }
}

public string WebsocketPattern {
get { return websocketPattern; }
set { websocketPattern = value; }
}

public bool MqttEnabled {
get { return mqttEnabled; }
set { mqttEnabled = value; }
}

public bool ConsoleLogging {
get => consoleLogging;
set => consoleLogging = value;
public bool WebsocketEnabled {
get { return websocketEnabled; }
set { websocketEnabled = value; }
}

public Microsoft.Extensions.Configuration.IConfiguration LogConfig { get; set; }

private HostAddress address;
private int websocketPort;
private string websocketPattern;
private bool mqttEnabled, websocketEnabled;

private MqttServer server;
private bool consoleLogging;
private WebApplication webapp;
private CancellationTokenSource webappCts;


public MqttBroker(HostAddress address, bool consoleLogging = false) {
public MqttBroker(HostAddress address, bool mqttEnabled = true, bool websocketEnabled = true, int websocketPort = 5000, string websocketPattern = "mqtt") {
this.address = address;
this.consoleLogging = consoleLogging;
this.mqttEnabled = mqttEnabled;
this.websocketEnabled = websocketEnabled;
this.websocketPort = websocketPort;
this.websocketPattern = websocketPattern;
}

public void Dispose() {
Expand All @@ -38,23 +65,64 @@ public void Dispose() {

public IBroker StartUp() {
var t = StartUpAsync();
return this;
}

public Task StartUpAsync() {
return StartUpServerAsync(mqttEnabled, websocketEnabled);
}

public void TearDown() {
var t = TearDownAsync();

//if (consoleLogging) Console.WriteLine("Shutdown");
webappCts.Cancel();
webapp.WaitForShutdown();

t.Wait();
}

return this;
public Task TearDownAsync() {
return server.StopAsync();
}

private Task StartUpMqttAsync() {
var optionsBuilder = new MqttServerOptionsBuilder()
.WithDefaultEndpoint()
.WithPersistentSessions() // enables QOS-Level 3
.WithDefaultEndpointPort(Address.Port);


private WebApplication webapp;
private CancellationTokenSource webappCts;
public void StartUpWebsocket() {
server = new MqttFactory().CreateMqttServer(optionsBuilder.Build());
server.InterceptingSubscriptionAsync += Server_InterceptingSubscriptionAsync;
server.InterceptingPublishAsync += Server_InterceptingPublishAsync;
server.ClientConnectedAsync += Server_ClientConnectedAsync;
server.ClientDisconnectedAsync += Server_ClientDisconnectedAsync;
server.StartedAsync += Server_StartedAsync;
server.StoppedAsync += Server_StoppedAsync;

return server.StartAsync();
}

private Task StartUpServerAsync(bool mqttEnabled = true, bool websocketEnabled = true) {
webappCts = new CancellationTokenSource();
var builder = WebApplication.CreateBuilder();
builder.WebHost.UseKestrel(o =>
{
o.ListenAnyIP(1883, l => l.UseMqtt());
o.ListenAnyIP(5000);
o.ListenAnyIP(address.Port, l => l.UseMqtt());
if(websocketEnabled) o.ListenAnyIP(websocketPort);
});

// setup logger
if(LogConfig != null) {
Log.Logger = new LoggerConfiguration()
.ReadFrom.Configuration(LogConfig).CreateLogger();
} else {
Log.Logger = new LoggerConfiguration()
.WriteTo.Console().CreateLogger();
}
builder.Host.UseSerilog();

var optionsBuilder = new MqttServerOptionsBuilder()
//.WithDefaultEndpoint()
//.WithPersistentSessions() // enables QOS-Level 3
Expand All @@ -79,7 +147,7 @@ public void StartUpWebsocket() {
webapp.UseEndpoints(endpoints =>
{
//endpoints.MapMqtt("");
endpoints.MapMqtt("/mqtt");
if (websocketEnabled) endpoints.MapMqtt($"/{websocketPattern}");
//endpoints.MapConnectionHandler<MqttConnectionHandler>(
// "/mqtt",
// httpConnectionDispatcherOptions => httpConnectionDispatcherOptions.WebSockets.SubProtocolSelector = protocolList => protocolList.FirstOrDefault() ?? string.Empty);
Expand All @@ -88,74 +156,42 @@ public void StartUpWebsocket() {

//webapp.UseMqttServer(s => { });

webapp.RunAsync(webappCts.Token);
return webapp.RunAsync(webappCts.Token);
}

public Task StartUpAsync() {

StartUpWebsocket();
return Task.CompletedTask;

//var optionsBuilder = new MqttServerOptionsBuilder()
// .WithDefaultEndpoint()
// .WithPersistentSessions() // enables QOS-Level 3
// .WithDefaultEndpointPort(Address.Port);


//server = new MqttFactory().CreateMqttServer(optionsBuilder.Build());
//server.InterceptingSubscriptionAsync += Server_InterceptingSubscriptionAsync;
//server.InterceptingPublishAsync += Server_InterceptingPublishAsync;
//server.ClientConnectedAsync += Server_ClientConnectedAsync;
//server.ClientDisconnectedAsync += Server_ClientDisconnectedAsync;
//server.StartedAsync += Server_StartedAsync;
//server.StoppedAsync += Server_StoppedAsync;

//return server.StartAsync();
}


public void TearDown() {
var t = TearDownAsync();

if (consoleLogging) Console.WriteLine("Shutdown");
webappCts.Cancel();
webapp.WaitForShutdown();

t.Wait();
}

public Task TearDownAsync() {
return server.StopAsync();
}
#region event monitoring/interception

private Task Server_StartedAsync(EventArgs arg) {
if (consoleLogging) Console.WriteLine($"MqttBroker: Broker started.");
Log.Information($"Broker started.");
return Task.CompletedTask;
}

private Task Server_StoppedAsync(EventArgs arg) {
if (consoleLogging) Console.WriteLine($"MqttBroker: Broker stopped.");
Log.Information($"Broker stopped.");
return Task.CompletedTask;
}

private Task Server_ClientConnectedAsync(ClientConnectedEventArgs arg) {
if (consoleLogging) Console.WriteLine($"MqttBroker: Client {arg.ClientId} connected.");
Log.Information($"Client {arg.ClientId} connected.");
return Task.CompletedTask;
}

private Task Server_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg) {
if (consoleLogging) Console.WriteLine($"MqttBroker: Client {arg.ClientId} disconnected.");
Log.Information($"Client {arg.ClientId} disconnected.");
return Task.CompletedTask;
}

private Task Server_InterceptingSubscriptionAsync(InterceptingSubscriptionEventArgs arg) {
if (consoleLogging) Console.WriteLine($"MqttBroker: Client {arg.ClientId} subscribed to topic {arg.TopicFilter.Topic}.");
Log.Information($"Client {arg.ClientId} subscribed to topic {arg.TopicFilter.Topic}.");
return Task.CompletedTask;
}

private Task Server_InterceptingPublishAsync(InterceptingPublishEventArgs arg) {
if (consoleLogging) Console.WriteLine($"MqttBroker: Client {arg.ClientId} sends message to topic {arg.ApplicationMessage.Topic}.");
Log.Information($"Client {arg.ClientId} sends message to topic {arg.ApplicationMessage.Topic}.");
return Task.CompletedTask;
}

#endregion event monitoring/interception
}
}
1 change: 1 addition & 0 deletions src/Communication/Communication.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
<PackageReference Include="MQTTnet" Version="4.2.1.781" />
<PackageReference Include="MQTTnet.AspNetCore" Version="4.2.1.781" />
<PackageReference Include="MQTTnet.Extensions.ManagedClient" Version="4.2.1.781" />
<PackageReference Include="Serilog.AspNetCore" Version="8.0.1" />
<PackageReference Include="YamlDotNet" Version="13.1.1" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion src/DemoApp/PerformanceTestSuite.cs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ public static double RunTest_MQTTSocket_PublishSubscribe(int runCount, IPayloadC

HostAddress address = new HostAddress("127.0.0.1", 1883);
MqttBroker broker = new MqttBroker(address);
broker.StartUpWebsocket();
broker.StartUp();
Task.Delay(1000).Wait();

string pubsubTopic = "demoapp/performance/mqtt/pubsubtest";
Expand Down
4 changes: 2 additions & 2 deletions src/DemoApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ public static void RunDemo_Mqtt_DocProducerConsumer() {

MqttBroker broker = new MqttBroker(address);
//broker.StartUp();
broker.StartUpWebsocket();
broker.StartUp();

string pubsubTopic = "demoapp/docs";
string respTopic = "demoapp/responses";
Expand Down Expand Up @@ -407,7 +407,7 @@ public static void RunDemo_MQTTandWS_StreamDoubleData() {

MqttBroker broker = new MqttBroker(address);
//broker.StartUp();
broker.StartUpWebsocket();
broker.StartUp();

ISocket socket = new MqttSocket("socket1", "socket1", address, converter, connect: true);
string group = Guid.NewGuid().ToString();
Expand Down

0 comments on commit 6a7b89b

Please sign in to comment.