Skip to content

Commit

Permalink
Using Thread instead of Task for background reader
Browse files Browse the repository at this point in the history
  • Loading branch information
DrEsteban committed Nov 15, 2024
1 parent cf33acc commit a6f0190
Showing 1 changed file with 27 additions and 13 deletions.
40 changes: 27 additions & 13 deletions BambuVideoStream/BambuStreamBackgroundService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,10 @@ public BambuStreamBackgroundService(
this.hostLifetime = hostLifetime;
this.mqttProcessingChannel = Channel.CreateBounded<MqttApplicationMessageReceivedEventArgs>(
new BoundedChannelOptions(5) // Max 5 messages in queue
{
{
SingleReader = true,
FullMode = BoundedChannelFullMode.DropOldest,
AllowSynchronousContinuations = true
AllowSynchronousContinuations = false
});
}

Expand Down Expand Up @@ -151,19 +151,33 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
await this.mqttClient.SubscribeAsync(this.mqttSubscribeOptions, stoppingToken);

// Start processing messages
Task.Run(async () =>
new Thread(async () =>
{
await foreach (var e in this.mqttProcessingChannel.Reader.ReadAllAsync())
try
{
await foreach (var e in this.mqttProcessingChannel.Reader.ReadAllAsync(stoppingToken))
{
try
{
this.ProcessBambuMessage(e);
// Super small delay to prevent bombarding OBS
await Task.Delay(10, stoppingToken);
}
catch { } // Method logs all exceptions
}
}
catch (OperationCanceledException)
{ }
catch (Exception ex)
{
this.log.LogError(ex, "Unexpected error in reader thread");
}
finally
{
try
{
this.ProcessBambuMessage(e);
// Super small delay to prevent bombarding OBS
await Task.Delay(10, stoppingToken);
}
catch { } // Method logs all exceptions
this.log.LogDebug("Reader thread stopped");
this.hostLifetime.StopApplication();
}
}).Forget();
}).Start();
stoppingToken.Register(() => this.mqttProcessingChannel.Writer.Complete());

// Wait for the application to stop
Expand Down Expand Up @@ -462,7 +476,7 @@ private void ProcessBambuMessage(MqttApplicationMessageReceivedEventArgs e)
break;
}
}
catch (ObjectDisposedException)
catch (Exception err) when (err is ObjectDisposedException or OperationCanceledException)
{
// Do nothing. This is expected when the service is shutting down.
}
Expand Down

0 comments on commit a6f0190

Please sign in to comment.