Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Include OpenTelemetry tracing #431

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
Expand All @@ -22,6 +22,7 @@
<ProjectReference Include="..\..\src\KafkaFlow.Compressor\KafkaFlow.Compressor.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.LogHandler.Console\KafkaFlow.LogHandler.Console.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Microsoft.DependencyInjection\KafkaFlow.Microsoft.DependencyInjection.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.OpenTelemetry\KafkaFlow.OpenTelemetry.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Serializer.JsonCore\KafkaFlow.Serializer.JsonCore.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Serializer.ProtobufNet\KafkaFlow.Serializer.ProtobufNet.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Serializer\KafkaFlow.Serializer.csproj" />
Expand All @@ -31,6 +32,8 @@

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.17" />
<PackageReference Include="OpenTelemetry" Version="1.6.0" />
<PackageReference Include="OpenTelemetry.Exporter.Console" Version="1.6.0" />
</ItemGroup>

</Project>
11 changes: 11 additions & 0 deletions samples/KafkaFlow.Sample/Program.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using KafkaFlow;
using KafkaFlow.Configuration;
using KafkaFlow.OpenTelemetry.Trace;
using KafkaFlow.Producers;
using KafkaFlow.Sample;
using KafkaFlow.Serializer;
using KafkaFlow.TypedHandler;
using Microsoft.Extensions.DependencyInjection;
using OpenTelemetry;
using OpenTelemetry.Trace;

var services = new ServiceCollection();

Expand Down Expand Up @@ -38,6 +43,7 @@
)
)
)
.AddOpenTelemetryInstrumentation()
);

var provider = services.BuildServiceProvider();
Expand All @@ -52,6 +58,11 @@

Console.WriteLine("Type the number of messages to produce or 'exit' to quit:");

using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource("KafkaFlow")
.AddConsoleExporter()
.Build();

while (true)
{
var input = Console.ReadLine();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace KafkaFlow.Configuration

Check warning on line 1 in src/KafkaFlow.Abstractions/Configuration/IKafkaConfigurationBuilder.cs

View workflow job for this annotation

GitHub Actions / release

Missing XML comment for publicly visible type or member 'IKafkaConfigurationBuilder.SubscribeEvents(Action<IEventsListener>)' [/home/runner/work/kafkaflow/kafkaflow/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj]
{
using System;
using KafkaFlow.Events;

/// <summary>
/// A builder to configure KafkaFlow
Expand All @@ -21,5 +22,7 @@
/// <returns></returns>
IKafkaConfigurationBuilder UseLogHandler<TLogHandler>()
where TLogHandler : ILogHandler;

IKafkaConfigurationBuilder SubscribeEvents(Action<IEventsListener> eventsListener);

Check warning on line 26 in src/KafkaFlow.Abstractions/Configuration/IKafkaConfigurationBuilder.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Missing XML comment for publicly visible type or member 'IKafkaConfigurationBuilder.SubscribeEvents(Action<IEventsListener>)'
}
}
14 changes: 14 additions & 0 deletions src/KafkaFlow.Abstractions/Events/Args/ConsumeErrorEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;

Check warning on line 1 in src/KafkaFlow.Abstractions/Events/Args/ConsumeErrorEventArgs.cs

View workflow job for this annotation

GitHub Actions / release

Missing XML comment for publicly visible type or member 'ConsumeErrorEventArgs' [/home/runner/work/kafkaflow/kafkaflow/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj]

Check warning on line 1 in src/KafkaFlow.Abstractions/Events/Args/ConsumeErrorEventArgs.cs

View workflow job for this annotation

GitHub Actions / release

Missing XML comment for publicly visible type or member 'ConsumeErrorEventArgs.ConsumeErrorEventArgs(Exception)' [/home/runner/work/kafkaflow/kafkaflow/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj]

Check warning on line 1 in src/KafkaFlow.Abstractions/Events/Args/ConsumeErrorEventArgs.cs

View workflow job for this annotation

GitHub Actions / release

Missing XML comment for publicly visible type or member 'ConsumeErrorEventArgs.Exception' [/home/runner/work/kafkaflow/kafkaflow/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj]

namespace KafkaFlow.Events.Args
{
public class ConsumeErrorEventArgs

Check warning on line 5 in src/KafkaFlow.Abstractions/Events/Args/ConsumeErrorEventArgs.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Missing XML comment for publicly visible type or member 'ConsumeErrorEventArgs'
{
public ConsumeErrorEventArgs(Exception exception)

Check warning on line 7 in src/KafkaFlow.Abstractions/Events/Args/ConsumeErrorEventArgs.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Missing XML comment for publicly visible type or member 'ConsumeErrorEventArgs.ConsumeErrorEventArgs(Exception)'
{
this.Exception = exception;
}

public Exception Exception { get; set; }

Check warning on line 12 in src/KafkaFlow.Abstractions/Events/Args/ConsumeErrorEventArgs.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Missing XML comment for publicly visible type or member 'ConsumeErrorEventArgs.Exception'
}
}
12 changes: 12 additions & 0 deletions src/KafkaFlow.Abstractions/Events/Args/ConsumeStartEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace KafkaFlow.Events.Args

Check warning on line 1 in src/KafkaFlow.Abstractions/Events/Args/ConsumeStartEventArgs.cs

View workflow job for this annotation

GitHub Actions / release

Missing XML comment for publicly visible type or member 'ConsumeStartEventArgs' [/home/runner/work/kafkaflow/kafkaflow/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj]

Check warning on line 1 in src/KafkaFlow.Abstractions/Events/Args/ConsumeStartEventArgs.cs

View workflow job for this annotation

GitHub Actions / release

Missing XML comment for publicly visible type or member 'ConsumeStartEventArgs.MessageContext' [/home/runner/work/kafkaflow/kafkaflow/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj]
{
public class ConsumeStartEventArgs

Check warning on line 3 in src/KafkaFlow.Abstractions/Events/Args/ConsumeStartEventArgs.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Missing XML comment for publicly visible type or member 'ConsumeStartEventArgs'
{
internal ConsumeStartEventArgs(IMessageContext context)
{
this.MessageContext = context;
}

public IMessageContext MessageContext { get; set; }

Check warning on line 10 in src/KafkaFlow.Abstractions/Events/Args/ConsumeStartEventArgs.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Missing XML comment for publicly visible type or member 'ConsumeStartEventArgs.MessageContext'
}
}
14 changes: 14 additions & 0 deletions src/KafkaFlow.Abstractions/Events/Args/ProduceErrorEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;

namespace KafkaFlow.Events.Args
{
public class ProduceErrorEventArgs
{
public ProduceErrorEventArgs(Exception exception)
{
this.Exception = exception;
}

public Exception Exception { get; set; }
}
}
12 changes: 12 additions & 0 deletions src/KafkaFlow.Abstractions/Events/Args/ProduceStartEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace KafkaFlow.Events.Args
{
public class ProduceStartEventArgs
{
internal ProduceStartEventArgs(IMessageContext context)
{
this.MessageContext = context;
}

public IMessageContext MessageContext { get; set; }
}
}
36 changes: 36 additions & 0 deletions src/KafkaFlow.Abstractions/Events/EventsManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using System;
using KafkaFlow.Events.Args;

namespace KafkaFlow.Events
{
public class EventsManager : IEventsListener, IEventsNotifier
{
public event EventHandler<ConsumeStartEventArgs> OnConsumeStart;

public event EventHandler<ProduceStartEventArgs> OnProduceStart;

public event EventHandler<ConsumeErrorEventArgs> OnConsumeError;

public event EventHandler<ProduceErrorEventArgs> OnProduceError;

public void NotifyOnConsumeError(Exception exception)
{
this.OnConsumeError?.Invoke(this, new ConsumeErrorEventArgs(exception));
}

public void NotifyOnConsumeStart(IMessageContext context)
{
this.OnConsumeStart?.Invoke(this, new ConsumeStartEventArgs(context));
}

public void NotifyOnProduceError(Exception exception)
{
this.OnProduceError?.Invoke(this, new ProduceErrorEventArgs(exception));
}

public void NotifyOnProduceStart(IMessageContext context)
{
this.OnProduceStart?.Invoke(this, new ProduceStartEventArgs(context));
}
}
}
18 changes: 18 additions & 0 deletions src/KafkaFlow.Abstractions/Events/IEventsListener.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;
using System.Collections.Generic;
using System.Text;
using KafkaFlow.Events.Args;

namespace KafkaFlow.Events
{
public interface IEventsListener
{
event EventHandler<ConsumeStartEventArgs> OnConsumeStart;

event EventHandler<ProduceStartEventArgs> OnProduceStart;

event EventHandler<ConsumeErrorEventArgs> OnConsumeError;

event EventHandler<ProduceErrorEventArgs> OnProduceError;
}
}
18 changes: 18 additions & 0 deletions src/KafkaFlow.Abstractions/Events/IEventsNotifier.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;

namespace KafkaFlow.Events
{
// Isto devia ser internal visto que a notificação de events não deveria ser publica, apenas a subscrição desses eventos

// Falta o OnConsumeEnd e OnProduceEnd
public interface IEventsNotifier
{
void NotifyOnConsumeError(Exception exception);

void NotifyOnConsumeStart(IMessageContext context);

void NotifyOnProduceError(Exception exception);

void NotifyOnProduceStart(IMessageContext context);
}
}
4 changes: 4 additions & 0 deletions src/KafkaFlow.Abstractions/IConsumerContext.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace KafkaFlow
{
using System;
using System.Collections.Generic;
using System.Threading;

/// <summary>
Expand Down Expand Up @@ -73,5 +74,8 @@ public interface IConsumerContext
/// Resume Kafka's message fetch
/// </summary>
void Resume();

// TODO: Need to discuss
IReadOnlyCollection<string> Brokers { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@
<ProjectReference Include="..\KafkaFlow.Admin\KafkaFlow.Admin.csproj" />
<ProjectReference Include="..\KafkaFlow\KafkaFlow.csproj" />
</ItemGroup>

</Project>
31 changes: 31 additions & 0 deletions src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
namespace KafkaFlow.Configuration
{
using KafkaFlow.OpenTelemetry.Trace;

/// <summary>
/// Adds OpenTelemetry instrumentation
/// </summary>
public static class ExtensionMethods
{
/// <summary>
/// Adds OpenTelemetry instrumentation
/// </summary>
/// <param name="builder">The Kafka configuration builder</param>
/// <returns></returns>
public static IKafkaConfigurationBuilder AddOpenTelemetryInstrumentation(this IKafkaConfigurationBuilder builder)
{
var tracerConsumerMiddleware = new TracerConsumerMiddleware();
var tracerProducerMiddleware = new TracerProducerMiddleware();

builder.SubscribeEvents(events =>
{
events.OnConsumeError += (sender, args) => tracerConsumerMiddleware.UpdateActivityOnError(args.Exception);

Check notice on line 22 in src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs#L22

'sender' is not used. Use discard parameter instead.
events.OnConsumeStart += (sender, args) => tracerConsumerMiddleware.CreateActivityOnConsume(args.MessageContext);

Check notice on line 23 in src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs#L23

'sender' is not used. Use discard parameter instead.
events.OnProduceError += (sender, args) => tracerProducerMiddleware.UpdateActivityOnError(args.Exception);

Check notice on line 24 in src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs#L24

'sender' is not used. Use discard parameter instead.
events.OnProduceStart += (sender, args) => tracerProducerMiddleware.CreateActivityOnProduce(args.MessageContext);

Check notice on line 25 in src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs#L25

'sender' is not used. Use discard parameter instead.
});

return builder;
}
}
}
19 changes: 19 additions & 0 deletions src/KafkaFlow.OpenTelemetry/KafkaFlow.OpenTelemetry.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="OpenTelemetry.Api" Version="1.6.0" />
<PackageReference Include="OpenTelemetry.Extensions.Propagators" Version="1.6.0" />
<PackageReference Include="OpenTelemetry.SemanticConventions" Version="1.0.0-rc9.9">
<Aliases>SemanticConventions</Aliases>
</PackageReference>
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\KafkaFlow.Abstractions\KafkaFlow.Abstractions.csproj" />
</ItemGroup>

</Project>
33 changes: 33 additions & 0 deletions src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
extern alias SemanticConventions;

namespace KafkaFlow.OpenTelemetry.Trace
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reflection;
using Conventions = SemanticConventions::OpenTelemetry.Trace.TraceSemanticConventions;

public static class KafkaFlowActivitySourceHelper
{
internal static readonly IEnumerable<KeyValuePair<string, object>> CreationTags = new[]

Check warning on line 13 in src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs#L13

Move this field's initializer into a static constructor.
{
new KeyValuePair<string, object>(Conventions.AttributePeerService, KafkaString),
new KeyValuePair<string, object>(Conventions.AttributeMessagingSystem, KafkaString),
};

internal static readonly string KafkaFlowString = "KafkaFlow";

Check notice on line 19 in src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs#L19

Replace this 'static readonly' declaration with 'const'.
internal static readonly string KafkaString = "kafka";

Check notice on line 20 in src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs#L20

Replace this 'static readonly' declaration with 'const'.

private static readonly Version Version = Assembly.GetExecutingAssembly().GetName().Version;

public static readonly ActivitySource ActivitySource = new ActivitySource(KafkaFlowString, Version.ToString());

public static void SetGenericTags(Activity activity)
{
activity?.SetTag("messaging.system", KafkaString);
// TODO: Broker information below. Set values after
activity?.SetTag("peer.service", KafkaString);
}
}
}
Loading
Loading