Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added improved Otel emitting for Wolverine endpoint events. Closes GH-3 #1192

Merged
merged 1 commit into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions docs/guide/durability/marten/event-sourcing.md
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ public static Task<Order> update_and_get_latest(IMessageBus bus, MarkItemReady c
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Persistence/OrderEventSourcingSample/Alternatives/Signatures.cs#L103-L113' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_updatedaggregate_with_invoke_async' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

]
Likewise, you can use `UpdatedAggregate` as the response body of an HTTP endpoint with Wolverine.HTTP [as shown here](/guide/http/marten.html#responding-with-the-updated-aggregate~~~~).

::: info
Expand All @@ -484,6 +484,24 @@ within the aggregate handler workflow as well. If you have a command handler met
the `[Aggregate]` attribute in HTTP usage, you can also pass the aggregate type as an argument to any `Before` / `LoadAsync` / `Validate`
method on that handler to do validation before the main handler method. Here's a sample from the tests of doing just that:

snippet: sample_passing_aggregate_into_validate_method
<!-- snippet: sample_passing_aggregate_into_validate_method -->
<a id='snippet-sample_passing_aggregate_into_validate_method'></a>
```cs
public record RaiseIfValidated(Guid LetterAggregateId);

public static class RaiseIfValidatedHandler
{
public static HandlerContinuation Validate(LetterAggregate aggregate) =>
aggregate.ACount == 0 ? HandlerContinuation.Continue : HandlerContinuation.Stop;

[AggregateHandler]
public static IEnumerable<object> Handle(RaiseIfValidated command, LetterAggregate aggregate)
{
yield return new BEvent();
}
}
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Persistence/MartenTests/aggregate_handler_workflow.cs#L406-L422' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_passing_aggregate_into_validate_method' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->


8 changes: 4 additions & 4 deletions docs/guide/handlers/sticky.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ message as an input.
```cs
public class StickyMessage;
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Testing/CoreTests/Acceptance/sticky_message_handlers.cs#L216-L220' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_stickymessage' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Testing/CoreTests/Acceptance/sticky_message_handlers.cs#L240-L244' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_stickymessage' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

And we're going to handle that `StickyMessage` message separately with two different handler types:
Expand All @@ -51,7 +51,7 @@ public static class GreenStickyHandler
}
}
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Testing/CoreTests/Acceptance/sticky_message_handlers.cs#L222-L242' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_sticky_handler_attribute' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Testing/CoreTests/Acceptance/sticky_message_handlers.cs#L246-L266' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_sticky_handler_attribute' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

::: tip
Expand Down Expand Up @@ -79,7 +79,7 @@ using var host = await Host.CreateDefaultBuilder()
opts.ListenAtPort(4000).Named("blue");
}).StartAsync();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Testing/CoreTests/Acceptance/sticky_message_handlers.cs#L154-L164' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_named_listener_endpoint' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Testing/CoreTests/Acceptance/sticky_message_handlers.cs#L178-L188' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_named_listener_endpoint' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

With all of that being said, the end result of the two `StickyMessage` handlers that are marked with `[StickyHandler]`
Expand Down Expand Up @@ -119,7 +119,7 @@ using var host = await Host.CreateDefaultBuilder()

}).StartAsync();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Testing/CoreTests/Acceptance/sticky_message_handlers.cs#L169-L187' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_sticky_handlers_by_endpoint_with_fluent_interface' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Testing/CoreTests/Acceptance/sticky_message_handlers.cs#L193-L211' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_sticky_handlers_by_endpoint_with_fluent_interface' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->


101 changes: 101 additions & 0 deletions docs/guide/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,107 @@ using var host = await Host.CreateDefaultBuilder()
Note that this `TelemetryEnabled()` method is available on all possible subscriber and listener types within Wolverine.
This flag applies to all messages sent, received, or executed at a particular endpoint.

Wolverine endeavors to publish OpenTelemetry spans or activities for meaningful actions within a Wolverine application. Here
are the specific span names, activity names, and tag names emitted by Wolverine:

<!-- snippet: sample_wolverine_open_telemetry_tracing_spans_and_activities -->
<a id='snippet-sample_wolverine_open_telemetry_tracing_spans_and_activities'></a>
```cs
/// <summary>
/// ActivityEvent marking when an incoming envelope is discarded
/// </summary>
public const string EnvelopeDiscarded = "wolverine.envelope.discarded";

/// <summary>
/// ActivityEvent marking when an incoming envelope is being moved to the error queue
/// </summary>
public const string MovedToErrorQueue = "wolverine.error.queued";

/// <summary>
/// ActivityEvent marking when an incoming envelope does not have a known message
/// handler and is being shunted to registered "NoHandler" actions
/// </summary>
public const string NoHandler = "wolverine.no.handler";

/// <summary>
/// ActivityEvent marking when a message failure is configured to pause the message listener
/// where the message was handled. This is tied to error handling policies
/// </summary>
public const string PausedListener = "wolverine.paused.listener";

/// <summary>
/// Span that is emitted when a listener circuit breaker determines that there are too many
/// failures and listening should be paused
/// </summary>
public const string CircuitBreakerTripped = "wolverine.circuit.breaker.triggered";

/// <summary>
/// Span emitted when a listening agent is started or restarted
/// </summary>
public const string StartingListener = "wolverine.starting.listener";

/// <summary>
/// Span emitted when a listening agent is stopping
/// </summary>
public const string StoppingListener = "wolverine.stopping.listener";

/// <summary>
/// Span emitted when a listening agent is being paused
/// </summary>
public const string PausingListener = "wolverine.pausing.listener";

/// <summary>
/// ActivityEvent marking that an incoming envelope is being requeued after a message
/// processing failure
/// </summary>
public const string EnvelopeRequeued = "wolverine.envelope.requeued";

/// <summary>
/// ActivityEvent marking that an incoming envelope is being retried after a message
/// processing failure
/// </summary>
public const string EnvelopeRetry = "wolverine.envelope.retried";

/// <summary>
/// ActivityEvent marking than an incoming envelope has been rescheduled for later
/// execution after a failure
/// </summary>
public const string ScheduledRetry = "wolverine.envelope.rescheduled";

/// <summary>
/// Tag name trying to explain why a sender or listener was stopped or paused
/// </summary>
public const string StopReason = "wolverine.stop.reason";

/// <summary>
/// The Wolverine Uri that identifies what sending or listening endpoint the activity
/// refers to
/// </summary>
public const string EndpointAddress = "wolverine.endpoint.address";

/// <summary>
/// A stop reason when back pressure policies call for a pause in processing in a single endpoint
/// </summary>
public const string TooBusy = "TooBusy";

/// <summary>
/// A span emitted when a sending agent for a specific endpoint is paused
/// </summary>
public const string SendingPaused = "wolverine.sending.pausing";

/// <summary>
/// A span emitted when a sending agent is resuming after having been paused
/// </summary>
public const string SendingResumed = "wolverine.sending.resumed";

/// <summary>
/// A stop reason when sending agents are paused after too many sender failures
/// </summary>
public const string TooManySenderFailures = "TooManySenderFailures";
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Wolverine/Runtime/WolverineTracing.cs#L25-L119' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_wolverine_open_telemetry_tracing_spans_and_activities' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Message Correlation

::: tip
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,5 @@ using var host = Host.CreateDefaultBuilder()

}).StartAsync();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Samples/DocumentationSamples/ExceptionHandling.cs#L64-L87' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_scheduled_retry' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Samples/DocumentationSamples/ExceptionHandling.cs#L70-L93' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_scheduled_retry' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->
2 changes: 1 addition & 1 deletion docs/guide/messaging/transports/external-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ builder.UseWolverine(opts =>
.Sequential();
});
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Persistence/PostgresqlTests/Transport/external_message_tables.cs#L160-L223' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configuring_external_database_messaging' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Persistence/PostgresqlTests/Transport/external_message_tables.cs#L187-L250' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configuring_external_database_messaging' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

So a couple things to know:
Expand Down
8 changes: 4 additions & 4 deletions src/Wolverine/ErrorHandling/CircuitBreaker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,18 +158,18 @@ public ValueTask ProcessExceptionsAsync(DateTimeOffset time, object[] tokens)
return UpdateTotalsAsync(time, failures, tokens.Length);
}

public ValueTask UpdateTotalsAsync(DateTimeOffset time, int failures, int total)
public async ValueTask UpdateTotalsAsync(DateTimeOffset time, int failures, int total)
{
var generation = DetermineGeneration(time);
generation.Failures += failures;
generation.Total += total;

if (failures > 0 && ShouldStopProcessing())
{
return _circuit.PauseAsync(Options.PauseTime);
using var activity = WolverineTracing.ActivitySource.StartActivity(WolverineTracing.CircuitBreakerTripped);
activity?.SetTag(WolverineTracing.EndpointAddress, _circuit.Endpoint.Uri);
await _circuit.PauseAsync(Options.PauseTime);
}

return ValueTask.CompletedTask;
}

public Generation DetermineGeneration(DateTimeOffset now)
Expand Down
1 change: 1 addition & 0 deletions src/Wolverine/ErrorHandling/PauseListenerContinuation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public ValueTask ExecuteAsync(IEnvelopeLifecycle lifecycle, IWolverineRuntime ru

if (agent != null)
{
activity?.AddTag(WolverineTracing.EndpointAddress, agent.Endpoint.Uri);
activity?.AddEvent(new ActivityEvent(WolverineTracing.PausedListener));

return agent.PauseAsync(PauseTime);
Expand Down
88 changes: 88 additions & 0 deletions src/Wolverine/Runtime/WolverineTracing.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,101 @@ public const string
public const string MessagingTempDestination = "messaging.temp_destination"; // boolean if this is temporary
public const string PayloadSizeBytes = "messaging.message_payload_size_bytes";

#region sample_wolverine_open_telemetry_tracing_spans_and_activities

/// <summary>
/// ActivityEvent marking when an incoming envelope is discarded
/// </summary>
public const string EnvelopeDiscarded = "wolverine.envelope.discarded";

/// <summary>
/// ActivityEvent marking when an incoming envelope is being moved to the error queue
/// </summary>
public const string MovedToErrorQueue = "wolverine.error.queued";

/// <summary>
/// ActivityEvent marking when an incoming envelope does not have a known message
/// handler and is being shunted to registered "NoHandler" actions
/// </summary>
public const string NoHandler = "wolverine.no.handler";

/// <summary>
/// ActivityEvent marking when a message failure is configured to pause the message listener
/// where the message was handled. This is tied to error handling policies
/// </summary>
public const string PausedListener = "wolverine.paused.listener";

/// <summary>
/// Span that is emitted when a listener circuit breaker determines that there are too many
/// failures and listening should be paused
/// </summary>
public const string CircuitBreakerTripped = "wolverine.circuit.breaker.triggered";

/// <summary>
/// Span emitted when a listening agent is started or restarted
/// </summary>
public const string StartingListener = "wolverine.starting.listener";

/// <summary>
/// Span emitted when a listening agent is stopping
/// </summary>
public const string StoppingListener = "wolverine.stopping.listener";

/// <summary>
/// Span emitted when a listening agent is being paused
/// </summary>
public const string PausingListener = "wolverine.pausing.listener";

/// <summary>
/// ActivityEvent marking that an incoming envelope is being requeued after a message
/// processing failure
/// </summary>
public const string EnvelopeRequeued = "wolverine.envelope.requeued";

/// <summary>
/// ActivityEvent marking that an incoming envelope is being retried after a message
/// processing failure
/// </summary>
public const string EnvelopeRetry = "wolverine.envelope.retried";

/// <summary>
/// ActivityEvent marking than an incoming envelope has been rescheduled for later
/// execution after a failure
/// </summary>
public const string ScheduledRetry = "wolverine.envelope.rescheduled";

/// <summary>
/// Tag name trying to explain why a sender or listener was stopped or paused
/// </summary>
public const string StopReason = "wolverine.stop.reason";

/// <summary>
/// The Wolverine Uri that identifies what sending or listening endpoint the activity
/// refers to
/// </summary>
public const string EndpointAddress = "wolverine.endpoint.address";

/// <summary>
/// A stop reason when back pressure policies call for a pause in processing in a single endpoint
/// </summary>
public const string TooBusy = "TooBusy";

/// <summary>
/// A span emitted when a sending agent for a specific endpoint is paused
/// </summary>
public const string SendingPaused = "wolverine.sending.pausing";

/// <summary>
/// A span emitted when a sending agent is resuming after having been paused
/// </summary>
public const string SendingResumed = "wolverine.sending.resumed";

/// <summary>
/// A stop reason when sending agents are paused after too many sender failures
/// </summary>
public const string TooManySenderFailures = "TooManySenderFailures";

#endregion

public static ActivitySource ActivitySource { get; } = new(
"Wolverine",
Expand Down
12 changes: 12 additions & 0 deletions src/Wolverine/Transports/ListeningAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ public async ValueTask StopAndDrainAsync()

try
{
using var activity = WolverineTracing.ActivitySource.StartActivity(WolverineTracing.StoppingListener);
activity?.SetTag(WolverineTracing.EndpointAddress, Uri);

await Listener.StopAsync();
await _receiver!.DrainAsync();

Expand Down Expand Up @@ -164,6 +167,9 @@ public async ValueTask StartAsync()
return;
}

using var activity = WolverineTracing.ActivitySource.StartActivity(WolverineTracing.StartingListener);
activity?.SetTag(WolverineTracing.EndpointAddress, Listener.Address);

_receiver ??= Endpoint.MaybeWrapReceiver(await buildReceiverAsync());

if (Endpoint.ListenerCount > 1)
Expand Down Expand Up @@ -192,6 +198,8 @@ public async ValueTask PauseAsync(TimeSpan pauseTime)
{
try
{
using var activity = WolverineTracing.ActivitySource.StartActivity(WolverineTracing.PausingListener);
activity?.SetTag(WolverineTracing.EndpointAddress, Listener.Address);
await StopAndDrainAsync();
}
catch (Exception e)
Expand All @@ -212,6 +220,10 @@ public async ValueTask MarkAsTooBusyAndStopReceivingAsync()
{
return;
}

using var activity = WolverineTracing.ActivitySource.StartActivity(WolverineTracing.PausingListener);
activity?.SetTag(WolverineTracing.EndpointAddress, Listener.Address);
activity?.SetTag(WolverineTracing.StopReason, WolverineTracing.TooBusy);

try
{
Expand Down
8 changes: 8 additions & 0 deletions src/Wolverine/Transports/Sending/SendingAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using Microsoft.Extensions.Logging;
using Wolverine.Configuration;
using Wolverine.Logging;
using Wolverine.Runtime;
using Wolverine.Util.Dataflow;

namespace Wolverine.Transports.Sending;
Expand Down Expand Up @@ -105,6 +106,9 @@ public Task<bool> TryToResumeAsync(CancellationToken cancellationToken)

Task ISenderCircuit.ResumeAsync(CancellationToken cancellationToken)
{
using var activity = WolverineTracing.ActivitySource.StartActivity(WolverineTracing.SendingResumed);
activity?.SetTag(WolverineTracing.EndpointAddress, Endpoint.Uri);

_circuitWatcher?.SafeDispose();
_circuitWatcher = null;

Expand Down Expand Up @@ -257,6 +261,10 @@ private async Task markFailedAsync(OutgoingMessageBatch batch)

if (_failureCount >= Endpoint.FailuresBeforeCircuitBreaks)
{
using var activity = WolverineTracing.ActivitySource.StartActivity(WolverineTracing.SendingPaused);
activity?.SetTag(WolverineTracing.StopReason, WolverineTracing.TooManySenderFailures);
activity?.SetTag(WolverineTracing.EndpointAddress, Endpoint.Uri);

await LatchAndDrainAsync();
await EnqueueForRetryAsync(batch);

Expand Down
Loading