Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding the persisted envelope count metrics back in. Closes GH-357 #1193

Merged
merged 1 commit into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion docs/guide/durability/leadership-and-troubleshooting.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ builder.UseWolverine(opts =>
}
});

using var host = builder.Build();
using var host = builder.Build();~~~~
await host.StartAsync();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Samples/DocumentationSamples/DurabilityModes.cs#L63-L90' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configuring_the_solo_mode' title='Start of snippet'>anchor</a></sup>
Expand All @@ -72,3 +72,41 @@ await host.StartAsync();
Running your Wolverine application like this means that Wolverine is able to more quickly start the transactional inbox
and outbox at start up time, and also to immediately recover any persisted incoming or outgoing messages from the previous
execution of the service on your local development box.

## Metrics <Badge type="tip" text="3.6" />

::: tip
These metrics can be used to understand when a Wolverine system is distressed when these numbers grow larger
:::

Wolverine emits observable gauge metrics for the size of the persisted inbox, outbox, and scheduled message counts:

1. `wolverine-inbox-count` - number of persisted, `Incoming` envelopes in the durable inbox
2. `wolverine-outbox-count` - number of persisted, `Outgoing` envelopes in the durable outbox
3. `wolverine-scheduled-count` - number of persisted, `Scheduled` envelopes in the durable inbox

In all cases, if you are using some sort of multi-tenancy where envelopes are stored in separate databsases per tenant,
the metric names above will be suffixed with ".[database name]".

You can disable or modify the polling of these metrics by these settings:

<!-- snippet: sample_configuring_persistence_metrics -->
<a id='snippet-sample_configuring_persistence_metrics'></a>
```cs
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
// This does assume that you have *some* kind of message
// persistence set up

// This is enabled by default, but just showing that
// you *could* disable it
opts.Durability.DurabilityMetricsEnabled = true;

// The default is 5 seconds, but maybe you want it slower
// because this does have to do a non-trivial query
opts.Durability.UpdateMetricsPeriod = 10.Seconds();
}).StartAsync();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Persistence/PersistenceTests/Samples/DocumentationSamples.cs#L209-L226' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configuring_persistence_metrics' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->
22 changes: 22 additions & 0 deletions src/Persistence/PersistenceTests/Samples/DocumentationSamples.cs
Original file line number Diff line number Diff line change
Expand Up @@ -203,4 +203,26 @@ public static async Task configure_inbox_keeping()

#endregion
}

public static async Task configure_persistence_metrics()
{
#region sample_configuring_persistence_metrics

using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
// This does assume that you have *some* kind of message
// persistence set up

// This is enabled by default, but just showing that
// you *could* disable it
opts.Durability.DurabilityMetricsEnabled = true;

// The default is 5 seconds, but maybe you want it slower
// because this does have to do a non-trivial query
opts.Durability.UpdateMetricsPeriod = 10.Seconds();
}).StartAsync();

#endregion
}
}
10 changes: 10 additions & 0 deletions src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using JasperFx.Core.Reflection;
using Microsoft.Extensions.Logging;
using Weasel.Core;
using Wolverine.Persistence;
using Wolverine.RDBMS.Durability;
using Wolverine.RDBMS.Polling;
using Wolverine.Runtime;
Expand All @@ -24,6 +25,7 @@ internal class DurabilityAgent : IAgent
private readonly ILogger<DurabilityAgent> _logger;
private Timer? _scheduledJobTimer;
private Timer? _recoveryTimer;
private PersistenceMetrics _metrics;

public DurabilityAgent(string databaseName, IWolverineRuntime runtime, IMessageDatabase database)
{
Expand Down Expand Up @@ -77,6 +79,13 @@ public static Uri AddMarkerType(Uri uri, Type markerType)

public Task StartAsync(CancellationToken cancellationToken)
{
_metrics = new PersistenceMetrics(_runtime.Meter, _settings, _database.Name);

if (_settings.DurabilityMetricsEnabled)
{
_metrics.StartPolling(_runtime.LoggerFactory.CreateLogger<PersistenceMetrics>(), _database);
}

var recoveryStart = _settings.ScheduledJobFirstExecution.Add(new Random().Next(0, 1000).Milliseconds());

_recoveryTimer = new Timer(_ =>
Expand Down Expand Up @@ -112,6 +121,7 @@ public void StartScheduledJobPolling()
public async Task StopAsync(CancellationToken cancellationToken)
{
_runningBlock.Complete();
_metrics.SafeDispose();

if (_scheduledJobTimer != null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using JasperFx.Core.Reflection;
using Microsoft.Extensions.Logging;
using Raven.Client.Documents;
using Wolverine.Persistence;
using Wolverine.Runtime;
using Wolverine.Runtime.Agents;
using Wolverine.Runtime.Handlers;
Expand All @@ -10,21 +11,6 @@

namespace Wolverine.RavenDb.Internals.Durability;


// TODO -- use a subscription for replayable dlq messages
// TODO -- try to use RavenDb's internal document expiry for expired envelopes
// TODO -- use a subscription on the leader for outgoing messages marked as any node?

/*
var operations = new IDatabaseOperation[]
{
new DeleteExpiredEnvelopesOperation(
new DbObjectName(_database.SchemaName, DatabaseConstants.IncomingTable), DateTimeOffset.UtcNow),
new MoveReplayableErrorMessagesToIncomingOperation(_database)
};

*/

public partial class RavenDbDurabilityAgent : IAgent
{
private readonly IDocumentStore _store;
Expand All @@ -40,6 +26,7 @@ public partial class RavenDbDurabilityAgent : IAgent

private readonly CancellationTokenSource _cancellation = new();
private readonly CancellationTokenSource _combined;
private PersistenceMetrics _metrics;

public RavenDbDurabilityAgent(IDocumentStore store, IWolverineRuntime runtime, RavenDbMessageStore parent)
{
Expand Down Expand Up @@ -67,6 +54,13 @@ public Task StartAsync(CancellationToken cancellationToken)

internal void StartTimers()
{
_metrics = new PersistenceMetrics(_runtime.Meter, _settings, null);

if (_settings.DurabilityMetricsEnabled)
{
_metrics.StartPolling(_runtime.LoggerFactory.CreateLogger<PersistenceMetrics>(), _parent);
}

var recoveryStart = _settings.ScheduledJobFirstExecution.Add(new Random().Next(0, 1000).Milliseconds());

_recoveryTask = Task.Run(async () =>
Expand Down Expand Up @@ -101,6 +95,11 @@ public Task StopAsync(CancellationToken cancellationToken)
{
_cancellation.Cancel();

if (_metrics != null)
{
_metrics.SafeDispose();
}

if (_recoveryTask != null)
{
_recoveryTask.SafeDispose();
Expand Down
10 changes: 10 additions & 0 deletions src/Wolverine/DurabilitySettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,16 @@ public class DurabilitySettings
/// </summary>
public TimeSpan TenantCheckPeriod { get; set; } = 5.Seconds();

/// <summary>
/// If using any kind of message persistence, this is the polling time
/// to update the metrics on the persisted envelope counts. Default is 5 seconds
/// </summary>
public TimeSpan UpdateMetricsPeriod { get; set; } = 5.Seconds();

/// <summary>
/// Is the polling for durability metrics enabled? Default is true
/// </summary>
public bool DurabilityMetricsEnabled { get; set; } = true;

/// <summary>
/// Get or set the logical Wolverine service name. By default, this is
Expand Down
80 changes: 80 additions & 0 deletions src/Wolverine/Persistence/PersistenceMetrics.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
using System.Diagnostics.Metrics;
using JasperFx.Core;
using Microsoft.Extensions.Logging;
using Wolverine.Logging;
using Wolverine.Persistence.Durability;

namespace Wolverine.Persistence;

public class PersistenceMetrics : IDisposable
{
private readonly DurabilitySettings _settings;
private readonly ObservableGauge<int> _incoming;
private readonly ObservableGauge<int> _outgoing;
private readonly ObservableGauge<int> _scheduled;
private CancellationTokenSource _cancellation;
private Task _task;

public PersistenceMetrics(Meter meter, DurabilitySettings settings, string? databaseName)
{
_settings = settings;
_cancellation = CancellationTokenSource.CreateLinkedTokenSource(settings.Cancellation);

if (databaseName.IsEmpty())
{
_incoming = meter.CreateObservableGauge(MetricsConstants.InboxCount, () => Counts.Incoming,
MetricsConstants.Messages, "Inbox messages");
_outgoing = meter.CreateObservableGauge(MetricsConstants.OutboxCount, () => Counts.Outgoing,
MetricsConstants.Messages, "Outbox messages");
_scheduled = meter.CreateObservableGauge(MetricsConstants.ScheduledCount, () => Counts.Scheduled,
MetricsConstants.Messages, "Scheduled messages");
}
else
{
_incoming = meter.CreateObservableGauge( MetricsConstants.InboxCount + "." + databaseName, () => Counts.Incoming,
MetricsConstants.Messages, "Inbox messages for database " + databaseName);
_outgoing = meter.CreateObservableGauge(MetricsConstants.OutboxCount+ "." + databaseName, () => Counts.Outgoing,
MetricsConstants.Messages, "Outbox messages for database " + databaseName);
_scheduled = meter.CreateObservableGauge(MetricsConstants.ScheduledCount+ "." + databaseName, () => Counts.Scheduled,
MetricsConstants.Messages, "Scheduled messages for database " + databaseName);
}
}

public PersistedCounts Counts { get; set; } = new();

public void StartPolling(ILogger logger, IMessageStore store)
{
_task = Task.Run(async () =>
{
using var timer = new PeriodicTimer(_settings.UpdateMetricsPeriod);

while (!_cancellation.IsCancellationRequested)
{
try
{
Counts = await store.Admin.FetchCountsAsync();
}
catch (TaskCanceledException)
{
continue;
}
catch (OperationCanceledException)
{
continue;
}
catch (Exception e)
{
logger.LogError(e, "Error trying to update the metrics on envelope storage for {Store}", store);
}

await timer.WaitForNextTickAsync(_cancellation.Token);
}
});
}

public void Dispose()
{
_cancellation.Cancel();
_task?.SafeDispose();
}
}
Loading