diff --git a/docs/guide/durability/marten/event-sourcing.md b/docs/guide/durability/marten/event-sourcing.md index 2961c0d0..068a69fc 100644 --- a/docs/guide/durability/marten/event-sourcing.md +++ b/docs/guide/durability/marten/event-sourcing.md @@ -468,7 +468,7 @@ public static Task update_and_get_latest(IMessageBus bus, MarkItemReady c ``` snippet source | anchor - +] Likewise, you can use `UpdatedAggregate` as the response body of an HTTP endpoint with Wolverine.HTTP [as shown here](/guide/http/marten.html#responding-with-the-updated-aggregate~~~~). ::: info @@ -484,6 +484,24 @@ within the aggregate handler workflow as well. If you have a command handler met the `[Aggregate]` attribute in HTTP usage, you can also pass the aggregate type as an argument to any `Before` / `LoadAsync` / `Validate` method on that handler to do validation before the main handler method. Here's a sample from the tests of doing just that: -snippet: sample_passing_aggregate_into_validate_method + + +```cs +public record RaiseIfValidated(Guid LetterAggregateId); + +public static class RaiseIfValidatedHandler +{ + public static HandlerContinuation Validate(LetterAggregate aggregate) => + aggregate.ACount == 0 ? HandlerContinuation.Continue : HandlerContinuation.Stop; + + [AggregateHandler] + public static IEnumerable Handle(RaiseIfValidated command, LetterAggregate aggregate) + { + yield return new BEvent(); + } +} +``` +snippet source | anchor + diff --git a/docs/guide/handlers/sticky.md b/docs/guide/handlers/sticky.md index 5eac186d..37fd3892 100644 --- a/docs/guide/handlers/sticky.md +++ b/docs/guide/handlers/sticky.md @@ -25,7 +25,7 @@ message as an input. ```cs public class StickyMessage; ``` -snippet source | anchor +snippet source | anchor And we're going to handle that `StickyMessage` message separately with two different handler types: @@ -51,7 +51,7 @@ public static class GreenStickyHandler } } ``` -snippet source | anchor +snippet source | anchor ::: tip @@ -79,7 +79,7 @@ using var host = await Host.CreateDefaultBuilder() opts.ListenAtPort(4000).Named("blue"); }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor With all of that being said, the end result of the two `StickyMessage` handlers that are marked with `[StickyHandler]` @@ -119,7 +119,7 @@ using var host = await Host.CreateDefaultBuilder() }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor diff --git a/docs/guide/logging.md b/docs/guide/logging.md index 48479afd..6d5688c5 100644 --- a/docs/guide/logging.md +++ b/docs/guide/logging.md @@ -254,6 +254,107 @@ using var host = await Host.CreateDefaultBuilder() Note that this `TelemetryEnabled()` method is available on all possible subscriber and listener types within Wolverine. This flag applies to all messages sent, received, or executed at a particular endpoint. +Wolverine endeavors to publish OpenTelemetry spans or activities for meaningful actions within a Wolverine application. Here +are the specific span names, activity names, and tag names emitted by Wolverine: + + + +```cs +/// +/// ActivityEvent marking when an incoming envelope is discarded +/// +public const string EnvelopeDiscarded = "wolverine.envelope.discarded"; + +/// +/// ActivityEvent marking when an incoming envelope is being moved to the error queue +/// +public const string MovedToErrorQueue = "wolverine.error.queued"; + +/// +/// ActivityEvent marking when an incoming envelope does not have a known message +/// handler and is being shunted to registered "NoHandler" actions +/// +public const string NoHandler = "wolverine.no.handler"; + +/// +/// ActivityEvent marking when a message failure is configured to pause the message listener +/// where the message was handled. This is tied to error handling policies +/// +public const string PausedListener = "wolverine.paused.listener"; + +/// +/// Span that is emitted when a listener circuit breaker determines that there are too many +/// failures and listening should be paused +/// +public const string CircuitBreakerTripped = "wolverine.circuit.breaker.triggered"; + +/// +/// Span emitted when a listening agent is started or restarted +/// +public const string StartingListener = "wolverine.starting.listener"; + +/// +/// Span emitted when a listening agent is stopping +/// +public const string StoppingListener = "wolverine.stopping.listener"; + +/// +/// Span emitted when a listening agent is being paused +/// +public const string PausingListener = "wolverine.pausing.listener"; + +/// +/// ActivityEvent marking that an incoming envelope is being requeued after a message +/// processing failure +/// +public const string EnvelopeRequeued = "wolverine.envelope.requeued"; + +/// +/// ActivityEvent marking that an incoming envelope is being retried after a message +/// processing failure +/// +public const string EnvelopeRetry = "wolverine.envelope.retried"; + +/// +/// ActivityEvent marking than an incoming envelope has been rescheduled for later +/// execution after a failure +/// +public const string ScheduledRetry = "wolverine.envelope.rescheduled"; + +/// +/// Tag name trying to explain why a sender or listener was stopped or paused +/// +public const string StopReason = "wolverine.stop.reason"; + +/// +/// The Wolverine Uri that identifies what sending or listening endpoint the activity +/// refers to +/// +public const string EndpointAddress = "wolverine.endpoint.address"; + +/// +/// A stop reason when back pressure policies call for a pause in processing in a single endpoint +/// +public const string TooBusy = "TooBusy"; + +/// +/// A span emitted when a sending agent for a specific endpoint is paused +/// +public const string SendingPaused = "wolverine.sending.pausing"; + +/// +/// A span emitted when a sending agent is resuming after having been paused +/// +public const string SendingResumed = "wolverine.sending.resumed"; + +/// +/// A stop reason when sending agents are paused after too many sender failures +/// +public const string TooManySenderFailures = "TooManySenderFailures"; +``` +snippet source | anchor + + ## Message Correlation ::: tip diff --git a/docs/guide/messaging/transports/azureservicebus/scheduled.md b/docs/guide/messaging/transports/azureservicebus/scheduled.md index b379ada6..74523be5 100644 --- a/docs/guide/messaging/transports/azureservicebus/scheduled.md +++ b/docs/guide/messaging/transports/azureservicebus/scheduled.md @@ -56,5 +56,5 @@ using var host = Host.CreateDefaultBuilder() }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor diff --git a/docs/guide/messaging/transports/external-tables.md b/docs/guide/messaging/transports/external-tables.md index 65481ce9..86f719f3 100644 --- a/docs/guide/messaging/transports/external-tables.md +++ b/docs/guide/messaging/transports/external-tables.md @@ -74,7 +74,7 @@ builder.UseWolverine(opts => .Sequential(); }); ``` -snippet source | anchor +snippet source | anchor So a couple things to know: diff --git a/src/Wolverine/ErrorHandling/CircuitBreaker.cs b/src/Wolverine/ErrorHandling/CircuitBreaker.cs index d9cb3a9f..562daa08 100644 --- a/src/Wolverine/ErrorHandling/CircuitBreaker.cs +++ b/src/Wolverine/ErrorHandling/CircuitBreaker.cs @@ -158,7 +158,7 @@ public ValueTask ProcessExceptionsAsync(DateTimeOffset time, object[] tokens) return UpdateTotalsAsync(time, failures, tokens.Length); } - public ValueTask UpdateTotalsAsync(DateTimeOffset time, int failures, int total) + public async ValueTask UpdateTotalsAsync(DateTimeOffset time, int failures, int total) { var generation = DetermineGeneration(time); generation.Failures += failures; @@ -166,10 +166,10 @@ public ValueTask UpdateTotalsAsync(DateTimeOffset time, int failures, int total) if (failures > 0 && ShouldStopProcessing()) { - return _circuit.PauseAsync(Options.PauseTime); + using var activity = WolverineTracing.ActivitySource.StartActivity(WolverineTracing.CircuitBreakerTripped); + activity?.SetTag(WolverineTracing.EndpointAddress, _circuit.Endpoint.Uri); + await _circuit.PauseAsync(Options.PauseTime); } - - return ValueTask.CompletedTask; } public Generation DetermineGeneration(DateTimeOffset now) diff --git a/src/Wolverine/ErrorHandling/PauseListenerContinuation.cs b/src/Wolverine/ErrorHandling/PauseListenerContinuation.cs index 87ea871a..7fa6d94b 100644 --- a/src/Wolverine/ErrorHandling/PauseListenerContinuation.cs +++ b/src/Wolverine/ErrorHandling/PauseListenerContinuation.cs @@ -31,6 +31,7 @@ public ValueTask ExecuteAsync(IEnvelopeLifecycle lifecycle, IWolverineRuntime ru if (agent != null) { + activity?.AddTag(WolverineTracing.EndpointAddress, agent.Endpoint.Uri); activity?.AddEvent(new ActivityEvent(WolverineTracing.PausedListener)); return agent.PauseAsync(PauseTime); diff --git a/src/Wolverine/Runtime/WolverineTracing.cs b/src/Wolverine/Runtime/WolverineTracing.cs index 100cbd4a..3194d783 100644 --- a/src/Wolverine/Runtime/WolverineTracing.cs +++ b/src/Wolverine/Runtime/WolverineTracing.cs @@ -22,13 +22,101 @@ public const string public const string MessagingTempDestination = "messaging.temp_destination"; // boolean if this is temporary public const string PayloadSizeBytes = "messaging.message_payload_size_bytes"; + #region sample_wolverine_open_telemetry_tracing_spans_and_activities + + /// + /// ActivityEvent marking when an incoming envelope is discarded + /// public const string EnvelopeDiscarded = "wolverine.envelope.discarded"; + + /// + /// ActivityEvent marking when an incoming envelope is being moved to the error queue + /// public const string MovedToErrorQueue = "wolverine.error.queued"; + + /// + /// ActivityEvent marking when an incoming envelope does not have a known message + /// handler and is being shunted to registered "NoHandler" actions + /// public const string NoHandler = "wolverine.no.handler"; + + /// + /// ActivityEvent marking when a message failure is configured to pause the message listener + /// where the message was handled. This is tied to error handling policies + /// public const string PausedListener = "wolverine.paused.listener"; + + /// + /// Span that is emitted when a listener circuit breaker determines that there are too many + /// failures and listening should be paused + /// + public const string CircuitBreakerTripped = "wolverine.circuit.breaker.triggered"; + + /// + /// Span emitted when a listening agent is started or restarted + /// + public const string StartingListener = "wolverine.starting.listener"; + + /// + /// Span emitted when a listening agent is stopping + /// + public const string StoppingListener = "wolverine.stopping.listener"; + + /// + /// Span emitted when a listening agent is being paused + /// + public const string PausingListener = "wolverine.pausing.listener"; + + /// + /// ActivityEvent marking that an incoming envelope is being requeued after a message + /// processing failure + /// public const string EnvelopeRequeued = "wolverine.envelope.requeued"; + + /// + /// ActivityEvent marking that an incoming envelope is being retried after a message + /// processing failure + /// public const string EnvelopeRetry = "wolverine.envelope.retried"; + + /// + /// ActivityEvent marking than an incoming envelope has been rescheduled for later + /// execution after a failure + /// public const string ScheduledRetry = "wolverine.envelope.rescheduled"; + + /// + /// Tag name trying to explain why a sender or listener was stopped or paused + /// + public const string StopReason = "wolverine.stop.reason"; + + /// + /// The Wolverine Uri that identifies what sending or listening endpoint the activity + /// refers to + /// + public const string EndpointAddress = "wolverine.endpoint.address"; + + /// + /// A stop reason when back pressure policies call for a pause in processing in a single endpoint + /// + public const string TooBusy = "TooBusy"; + + /// + /// A span emitted when a sending agent for a specific endpoint is paused + /// + public const string SendingPaused = "wolverine.sending.pausing"; + + /// + /// A span emitted when a sending agent is resuming after having been paused + /// + public const string SendingResumed = "wolverine.sending.resumed"; + + /// + /// A stop reason when sending agents are paused after too many sender failures + /// + public const string TooManySenderFailures = "TooManySenderFailures"; + + #endregion public static ActivitySource ActivitySource { get; } = new( "Wolverine", diff --git a/src/Wolverine/Transports/ListeningAgent.cs b/src/Wolverine/Transports/ListeningAgent.cs index 5065abb4..3df9e9eb 100644 --- a/src/Wolverine/Transports/ListeningAgent.cs +++ b/src/Wolverine/Transports/ListeningAgent.cs @@ -137,6 +137,9 @@ public async ValueTask StopAndDrainAsync() try { + using var activity = WolverineTracing.ActivitySource.StartActivity(WolverineTracing.StoppingListener); + activity?.SetTag(WolverineTracing.EndpointAddress, Uri); + await Listener.StopAsync(); await _receiver!.DrainAsync(); @@ -164,6 +167,9 @@ public async ValueTask StartAsync() return; } + using var activity = WolverineTracing.ActivitySource.StartActivity(WolverineTracing.StartingListener); + activity?.SetTag(WolverineTracing.EndpointAddress, Listener.Address); + _receiver ??= Endpoint.MaybeWrapReceiver(await buildReceiverAsync()); if (Endpoint.ListenerCount > 1) @@ -192,6 +198,8 @@ public async ValueTask PauseAsync(TimeSpan pauseTime) { try { + using var activity = WolverineTracing.ActivitySource.StartActivity(WolverineTracing.PausingListener); + activity?.SetTag(WolverineTracing.EndpointAddress, Listener.Address); await StopAndDrainAsync(); } catch (Exception e) @@ -212,6 +220,10 @@ public async ValueTask MarkAsTooBusyAndStopReceivingAsync() { return; } + + using var activity = WolverineTracing.ActivitySource.StartActivity(WolverineTracing.PausingListener); + activity?.SetTag(WolverineTracing.EndpointAddress, Listener.Address); + activity?.SetTag(WolverineTracing.StopReason, WolverineTracing.TooBusy); try { diff --git a/src/Wolverine/Transports/Sending/SendingAgent.cs b/src/Wolverine/Transports/Sending/SendingAgent.cs index 9038b8be..cc6ffc3e 100644 --- a/src/Wolverine/Transports/Sending/SendingAgent.cs +++ b/src/Wolverine/Transports/Sending/SendingAgent.cs @@ -2,6 +2,7 @@ using Microsoft.Extensions.Logging; using Wolverine.Configuration; using Wolverine.Logging; +using Wolverine.Runtime; using Wolverine.Util.Dataflow; namespace Wolverine.Transports.Sending; @@ -105,6 +106,9 @@ public Task TryToResumeAsync(CancellationToken cancellationToken) Task ISenderCircuit.ResumeAsync(CancellationToken cancellationToken) { + using var activity = WolverineTracing.ActivitySource.StartActivity(WolverineTracing.SendingResumed); + activity?.SetTag(WolverineTracing.EndpointAddress, Endpoint.Uri); + _circuitWatcher?.SafeDispose(); _circuitWatcher = null; @@ -257,6 +261,10 @@ private async Task markFailedAsync(OutgoingMessageBatch batch) if (_failureCount >= Endpoint.FailuresBeforeCircuitBreaks) { + using var activity = WolverineTracing.ActivitySource.StartActivity(WolverineTracing.SendingPaused); + activity?.SetTag(WolverineTracing.StopReason, WolverineTracing.TooManySenderFailures); + activity?.SetTag(WolverineTracing.EndpointAddress, Endpoint.Uri); + await LatchAndDrainAsync(); await EnqueueForRetryAsync(batch);