Skip to content

Commit

Permalink
Merge pull request #24 from atidev/rewrite-metricsfactory-to-di
Browse files Browse the repository at this point in the history
metricsFactory from DI
  • Loading branch information
CptnSnail authored Mar 28, 2024
2 parents 030fb63 + 420b6cd commit 2fa7a3a
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 9 deletions.
2 changes: 1 addition & 1 deletion ATI.Services.RabbitMQ/ATI.Services.RabbitMQ.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<NoWarn>1701;1702;CS1591;CS1571;CS1573;CS1574</NoWarn>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="atisu.services.common" Version="13.7.2" />
<PackageReference Include="atisu.services.common" Version="15.0.0" />
<PackageReference Include="EasyNetQ" Version="7.4.3" />
</ItemGroup>
</Project>
23 changes: 15 additions & 8 deletions ATI.Services.RabbitMQ/EventbusManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ public class EventbusManager : IDisposable, IInitializer
private readonly JsonSerializer _jsonSerializer;
private readonly string _connectionString;

private readonly MetricsFactory _inMetricsFactory = MetricsFactory.CreateRabbitMqMetricsFactory(RabbitMetricsType.Subscribe, nameof(EventbusManager), additionalSummaryLabels: "rmq_app_id");
private readonly MetricsFactory _outMetricsFactory = MetricsFactory.CreateRabbitMqMetricsFactory(RabbitMetricsType.Publish, nameof(EventbusManager));
private readonly MetricsInstance _inMetrics;
private readonly MetricsInstance _outMetrics;

private readonly ILogger _logger = LogManager.GetCurrentClassLogger();
private ConcurrentBag<SubscriptionInfo> _subscriptions = new();
Expand All @@ -52,12 +52,19 @@ public class EventbusManager : IDisposable, IInitializer
private static readonly UTF8Encoding BodyEncoding = new(false);
private readonly RmqTopology _rmqTopology;

public EventbusManager(JsonSerializer jsonSerializer, IOptions<EventbusOptions> options, RmqTopology rmqTopology)
public EventbusManager(
JsonSerializer jsonSerializer,
IOptions<EventbusOptions> options,
RmqTopology rmqTopology,
MetricsFactory metricsFactory)
{
_options = options.Value;
_connectionString = options.Value.ConnectionString;
_jsonSerializer = jsonSerializer;
_rmqTopology = rmqTopology;

_inMetrics = metricsFactory.CreateRabbitMqMetricsFactory(RabbitMetricsType.Subscribe, nameof(EventbusManager), additionalSummaryLabels: "rmq_app_id");
_outMetrics = metricsFactory.CreateRabbitMqMetricsFactory(RabbitMetricsType.Publish, nameof(EventbusManager));

_subscribePolicy = Policy.Handle<Exception>()
.WaitAndRetryForeverAsync(_ => _options.RabbitConnectInterval,
Expand Down Expand Up @@ -113,7 +120,7 @@ public async Task PublishRawAsync(
string.IsNullOrWhiteSpace(publishBody))
return;

using (_outMetricsFactory.CreateLoggingMetricsTimer(metricEntity, $"{exchangeName}:{routingKey}"))
using (_outMetrics.CreateLoggingMetricsTimer(metricEntity, $"{exchangeName}:{routingKey}"))
{
var messageProperties = GetProperties(additionalHeaders, withAcceptLang);
var exchange = new Exchange(exchangeName);
Expand Down Expand Up @@ -150,7 +157,7 @@ public async Task PublishAsync<T>(
publishObject == null)
return;

using (_outMetricsFactory.CreateLoggingMetricsTimer(metricEntity, $"{exchangeName}:{routingKey}"))
using (_outMetrics.CreateLoggingMetricsTimer(metricEntity, $"{exchangeName}:{routingKey}"))
{
var messageProperties = GetProperties(additionalHeaders, withAcceptLang);
var exchange = new Exchange(exchangeName);
Expand Down Expand Up @@ -297,7 +304,7 @@ private async Task<IDisposable> SubscribePrivateAsync(

async Task HandleEventBusMessageWithPolicy(ReadOnlyMemory<byte> body, MessageProperties props, MessageReceivedInfo info)
{
using (_inMetricsFactory.CreateLoggingMetricsTimer(metricEntity ?? "Eventbus",
using (_inMetrics.CreateLoggingMetricsTimer(metricEntity ?? "Eventbus",
$"{info.Exchange}:{info.RoutingKey}",
additionalLabels: props.AppId ?? "Unknown"))
{
Expand Down Expand Up @@ -329,7 +336,7 @@ private async Task BindConsumerAsync(QueueExchangeBinding mainQueueBinding,
{
return async (body, props, info) =>
{
using (_inMetricsFactory.CreateLoggingMetricsTimer(metricEntity ?? "Eventbus",
using (_inMetrics.CreateLoggingMetricsTimer(metricEntity ?? "Eventbus",
$"{info.Exchange}:{info.RoutingKey}",
additionalLabels: props.AppId ?? "Unknown"))
{
Expand Down Expand Up @@ -362,7 +369,7 @@ Func<ReadOnlyMemory<byte>, MessageProperties, MessageReceivedInfo, Task> HandleP
{
return async (body, props, info) =>
{
using (_outMetricsFactory.CreateLoggingMetricsTimer($"{metricEntity ?? "Eventbus"}-Poison", $"{info.Exchange}:{info.RoutingKey}",
using (_outMetrics.CreateLoggingMetricsTimer($"{metricEntity ?? "Eventbus"}-Poison", $"{info.Exchange}:{info.RoutingKey}",
additionalLabels: props.AppId ?? "Unknown"))
{
HandleMessageProps(props);
Expand Down

0 comments on commit 2fa7a3a

Please sign in to comment.