Skip to content

Commit

Permalink
Let all services report message_processing timings (#941)
Browse files Browse the repository at this point in the history
Fixes #940
  • Loading branch information
rngcntr authored Aug 24, 2023
1 parent 6771183 commit 86d9726
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 40 deletions.
2 changes: 1 addition & 1 deletion shared.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project>

<PropertyGroup>
<Version>0.12.0</Version>
<Version>0.13.0</Version>
<LangVersion>11</LangVersion>
<Nullable>enable</Nullable>
<WarningsAsErrors>CS8600;CS8602;CS8625;CS8618;CS8604;CS8601</WarningsAsErrors>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,19 @@ namespace Motor.Extensions.Diagnostics.Metrics;
public class PrometheusDelegatingMessageHandler<TInput> : DelegatingMessageHandler<TInput>
where TInput : class
{
private readonly IMetricFamily<ICounter> _messageProcessingTotal;
private readonly IMetricFamily<ISummary> _messageProcessing;

public PrometheusDelegatingMessageHandler(
IMetricsFactory<PrometheusDelegatingMessageHandler<TInput>> metricsFactory)
public PrometheusDelegatingMessageHandler(IMetricsFactory<PrometheusDelegatingMessageHandler<TInput>> metricsFactory)
{
_messageProcessingTotal =
metricsFactory.CreateCounter("message_processing_total", "Message processing status total", false, "status");
_messageProcessing =
metricsFactory.CreateSummary("message_processing", "Message processing duration in ms", false, "status");
}

public override async Task<ProcessedMessageStatus> HandleMessageAsync(MotorCloudEvent<TInput> dataCloudEvent,
CancellationToken token = default)
{
var processedMessageStatus = ProcessedMessageStatus.CriticalFailure;
using (new AutoIncCounter(() => _messageProcessingTotal.WithLabels(processedMessageStatus.ToString())))
using (new AutoObserveStopwatch(() => _messageProcessing.WithLabels(processedMessageStatus.ToString())))
{
processedMessageStatus = await base.HandleMessageAsync(dataCloudEvent, token);
}
Expand Down
19 changes: 5 additions & 14 deletions src/Motor.Extensions.Hosting/MultiOutputServiceAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Motor.Extensions.Diagnostics.Metrics;
using Motor.Extensions.Diagnostics.Metrics.Abstractions;
using Motor.Extensions.Hosting.Abstractions;
using Motor.Extensions.Hosting.CloudEvents;
using Prometheus.Client;

namespace Motor.Extensions.Hosting;

Expand All @@ -16,35 +13,29 @@ public class MultiOutputServiceAdapter<TInput, TOutput> : INoOutputService<TInpu
{
private readonly IMultiOutputService<TInput, TOutput> _converter;
private readonly ILogger<SingleOutputServiceAdapter<TInput, TOutput>> _logger;
private readonly ISummary? _messageProcessing;
private readonly ITypedMessagePublisher<TOutput> _publisher;

public MultiOutputServiceAdapter(ILogger<SingleOutputServiceAdapter<TInput, TOutput>> logger,
IMetricsFactory<SingleOutputServiceAdapter<TInput, TOutput>>? metrics,
IMultiOutputService<TInput, TOutput> converter,
ITypedMessagePublisher<TOutput> publisher)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_converter = converter ?? throw new ArgumentNullException(nameof(converter));
_publisher = publisher ?? throw new ArgumentNullException(nameof(publisher));
_messageProcessing = metrics?.CreateSummary("message_processing", "Message processing duration in ms");
}

public async Task<ProcessedMessageStatus> HandleMessageAsync(MotorCloudEvent<TInput> dataCloudEvent,
CancellationToken token = default)
{
try
{
using (new AutoObserveStopwatch(() => _messageProcessing))
await foreach (var message in _converter.ConvertMessageAsync(dataCloudEvent, token)
.ConfigureAwait(false).WithCancellation(token))
{
await foreach (var message in _converter.ConvertMessageAsync(dataCloudEvent, token)
.ConfigureAwait(false).WithCancellation(token))
if (message?.Data is not null)
{
if (message?.Data is not null)
{
await _publisher.PublishMessageAsync(message, token)
.ConfigureAwait(false);
}
await _publisher.PublishMessageAsync(message, token)
.ConfigureAwait(false);
}
}

Expand Down
5 changes: 2 additions & 3 deletions src/Motor.Extensions.Hosting/SingleOutputServiceAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ public class SingleOutputServiceAdapter<TInput, TOutput> : MultiOutputServiceAda
where TOutput : class
{
public SingleOutputServiceAdapter(ILogger<SingleOutputServiceAdapter<TInput, TOutput>> logger,
IMetricsFactory<SingleOutputServiceAdapter<TInput, TOutput>> metrics, ISingleOutputService<TInput, TOutput> service,
ITypedMessagePublisher<TOutput> publisher) :
base(logger, metrics, new SingleToMultiOutputAdapter<TInput, TOutput>(service), publisher)
ISingleOutputService<TInput, TOutput> service, ITypedMessagePublisher<TOutput> publisher) :
base(logger, new SingleToMultiOutputAdapter<TInput, TOutput>(service), publisher)
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using Moq;
using Motor.Extensions.Diagnostics.Metrics;
using Motor.Extensions.Diagnostics.Metrics.Abstractions;
using Xunit;

namespace Motor.Extensions.Diagnostics.Metrics_UnitTest;

public class PrometheusDelegatingMessageHandlerTests
{
[Fact]
public void Ctor_WithMetricsFactory_SummaryIsCreated()
{
var metricsFactoryMock = new Mock<IMetricsFactory<PrometheusDelegatingMessageHandler<string>>>();

var _ = new PrometheusDelegatingMessageHandler<string>(metricsFactoryMock.Object);

metricsFactoryMock.Verify(x =>
x.CreateSummary("message_processing", "Message processing duration in ms",
false, "status")
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,6 @@ public class MultiOutputServiceAdapterTests

private static Mock<ITypedMessagePublisher<string>> FakePublisher => new();


[Fact]
public void Ctor_WithMetricsFactory_SummaryIsCreated()
{
var metricsFactoryMock = new Mock<IMetricsFactory<SingleOutputServiceAdapter<string, string>>>();

GetMessageHandler(metrics: metricsFactoryMock.Object);

metricsFactoryMock.Verify(x =>
x.CreateSummary("message_processing", "Message processing duration in ms",
false, null as IReadOnlyList<QuantileEpsilonPair>, null, null, null)
);
}

[Fact]
public async Task HandleMessageAsync_WithContextAndInput_HasContext()
{
Expand Down Expand Up @@ -180,15 +166,14 @@ private async IAsyncEnumerable<MotorCloudEvent<string>> CreateReturnValues(param

private MultiOutputServiceAdapter<string, string> GetMessageHandler(
ILogger<SingleOutputServiceAdapter<string, string>>? logger = null,
IMetricsFactory<SingleOutputServiceAdapter<string, string>>? metrics = null,
IMultiOutputService<string, string>? service = null,
ITypedMessagePublisher<string>? publisher = null)
{
logger ??= Mock.Of<ILogger<SingleOutputServiceAdapter<string, string>>>();
service ??= FakeService.Object;
publisher ??= FakePublisher.Object;

return new MultiOutputServiceAdapter<string, string>(logger, metrics, service, publisher);
return new MultiOutputServiceAdapter<string, string>(logger, service, publisher);
}

private static MotorCloudEvent<string> CreateMotorEvent(string data = "")
Expand Down

0 comments on commit 86d9726

Please sign in to comment.