Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/release/3.0' into refactor!/me…
Browse files Browse the repository at this point in the history
…rge-projects

# Conflicts:
#	src/KafkaFlow.BatchConsume/KafkaFlow.BatchConsume.csproj
#	src/KafkaFlow.Compressor/ConfigurationBuilderExtensions.cs
#	src/KafkaFlow.TypedHandler/ConfigurationBuilderExtensions.cs
#	src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs
#	src/KafkaFlow/Extensions/ConfigurationBuilderExtensions.cs
  • Loading branch information
jose-sousa-8 committed Oct 4, 2023
2 parents d537d26 + 87cd205 commit 5a85aa1
Show file tree
Hide file tree
Showing 97 changed files with 4,002 additions and 13,760 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
}
catch (Exception exception)
{
context.ConsumerContext.ShouldStoreOffset = false;
context.ConsumerContext.AutoMessageCompletion = false;
this.logHandler.Error("Error handling message", exception,
new
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ namespace KafkaFlow.Configuration
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

/// <summary>
/// Used to build the consumer configuration
Expand Down Expand Up @@ -91,6 +92,24 @@ public interface IConsumerConfigurationBuilder
/// <returns></returns>
IConsumerConfigurationBuilder WithWorkersCount(int workersCount);

/// <summary>
/// Configures a custom function to dynamically calculate the number of workers.
/// </summary>
/// <param name="calculator">A function that takes a WorkersCountContext object and returns a Task yielding the new workers count</param>
/// <param name="evaluationInterval">The interval that the calculator will be called</param>
/// <returns>The IConsumerConfigurationBuilder instance for method chaining</returns>
IConsumerConfigurationBuilder WithWorkersCount(
Func<WorkersCountContext, IDependencyResolver, Task<int>> calculator,
TimeSpan evaluationInterval);

/// <summary>
/// Configures a custom function to dynamically calculate the number of workers.
/// By default, this function is called every 5 minutes.
/// </summary>
/// <param name="calculator">A function that takes a WorkersCountContext object and returns a Task yielding the new workers count</param>
/// <returns>The IConsumerConfigurationBuilder instance for method chaining</returns>
IConsumerConfigurationBuilder WithWorkersCount(Func<WorkersCountContext, IDependencyResolver, Task<int>> calculator);

/// <summary>
/// Sets how many messages will be buffered for each worker
/// </summary>
Expand Down Expand Up @@ -132,16 +151,11 @@ IConsumerConfigurationBuilder WithWorkDistributionStrategy<T>()
where T : class, IDistributionStrategy;

/// <summary>
/// Offsets will be stored after the execution of the handler and middlewares automatically, this is the default behaviour
/// </summary>
/// <returns></returns>
IConsumerConfigurationBuilder WithAutoStoreOffsets();

/// <summary>
/// The client should call the <see cref="IConsumerContext.StoreOffset()"/>
/// Configures the consumer for manual message completion.
/// The client should call the <see cref="IConsumerContext.Complete"/> to mark the message processing as finished
/// </summary>
/// <returns></returns>
IConsumerConfigurationBuilder WithManualStoreOffsets();
IConsumerConfigurationBuilder WithManualMessageCompletion();

/// <summary>
/// No offsets will be stored on Kafka
Expand Down
41 changes: 41 additions & 0 deletions src/KafkaFlow.Abstractions/Configuration/WorkersCountContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
namespace KafkaFlow.Configuration
{
using System.Collections.Generic;

/// <summary>
/// A metadata class with some context information help to calculate the number of workers
/// </summary>
public class WorkersCountContext
{
/// <summary>
/// Initializes a new instance of the <see cref="WorkersCountContext"/> class.
/// </summary>
/// <param name="consumerName">The consumer's name</param>
/// <param name="consumerGroupId">The consumer's group id</param>
/// <param name="assignedTopicsPartitions">The consumer's assigned partition</param>
public WorkersCountContext(
string consumerName,
string consumerGroupId,
IReadOnlyCollection<TopicPartitions> assignedTopicsPartitions)
{
this.ConsumerName = consumerName;
this.ConsumerGroupId = consumerGroupId;
this.AssignedTopicsPartitions = assignedTopicsPartitions;
}

/// <summary>
/// Gets the consumer's name
/// </summary>
public string ConsumerName { get; }

/// <summary>
/// Gets the consumer's group id
/// </summary>
public string ConsumerGroupId { get; }

/// <summary>
/// Gets the assigned partitions to the consumer
/// </summary>
public IReadOnlyCollection<TopicPartitions> AssignedTopicsPartitions { get; }
}
}
28 changes: 28 additions & 0 deletions src/KafkaFlow.Abstractions/Consumers/IWorker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
namespace KafkaFlow
{
/// <summary>
/// Represents the interface of a internal worker
/// </summary>
public interface IWorker
{
/// <summary>
/// Gets worker's id
/// </summary>
int Id { get; }

/// <summary>
/// Gets the subject for worker stopping events where observers can subscribe to receive notifications.
/// </summary>
IEvent WorkerStopping { get; }

/// <summary>
/// Gets the subject for worker stopped events where observers can subscribe to receive notifications.
/// </summary>
IEvent WorkerStopped { get; }

/// <summary>
/// Gets the subject for worker consumption completed events where observers can subscribe to receive notifications.
/// </summary>
IEvent<IMessageContext> WorkerProcessingEnded { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,35 @@ public static IDependencyConfigurator AddSingleton<TService>(
InstanceLifetime.Singleton);
}

/// <summary>
/// Registers a scoped type mapping where the returned instance will be given by the provided factory
/// </summary>
/// <param name="configurator">The <see cref="IDependencyConfigurator"/> object that this method was called on</param>
/// <typeparam name="TService">Type that will be created</typeparam>
/// <returns></returns>
public static IDependencyConfigurator AddScoped<TService>(this IDependencyConfigurator configurator)
where TService : class
{
return configurator.Add<TService>(InstanceLifetime.Scoped);
}

/// <summary>
/// Registers a scoped type mapping where the returned instance will be given by the provided factory
/// </summary>
/// <param name="configurator">The <see cref="IDependencyConfigurator"/> object that this method was called on</param>
/// <param name="factory">A factory to create new instances of the service implementation</param>
/// <typeparam name="TService">Type that will be created</typeparam>
/// <returns></returns>
public static IDependencyConfigurator AddScoped<TService>(
this IDependencyConfigurator configurator,
Func<IDependencyResolver, TService> factory)
{
return configurator.Add(
typeof(TService),
factory,
InstanceLifetime.Scoped);
}

/// <summary>
/// Registers a transient type mapping
/// </summary>
Expand Down
41 changes: 38 additions & 3 deletions src/KafkaFlow.Abstractions/IConsumerContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ namespace KafkaFlow
{
using System;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// Represents the message consumer
Expand Down Expand Up @@ -38,6 +39,11 @@ public interface IConsumerContext
/// </summary>
long Offset { get; }

/// <summary>
/// Gets the <see cref="TopicPartitionOffset"/> object associated with the message
/// </summary>
TopicPartitionOffset TopicPartitionOffset { get; }

/// <summary>
/// Gets the consumer group id from kafka consumer that received the message
/// </summary>
Expand All @@ -49,14 +55,43 @@ public interface IConsumerContext
DateTime MessageTimestamp { get; }

/// <summary>
/// Gets or sets a value indicating whether if the framework should store the current offset in the end when auto store offset is used
/// Gets or sets a value indicating whether if the framework should invoke the <see cref="Complete"/> method after the message has been processed
/// </summary>
bool AutoMessageCompletion { get; set; }

/// <summary>
/// Gets or sets a value indicating whether if the message offset must be stored when the message is marked as completed
/// </summary>
bool ShouldStoreOffset { get; set; }

/// <summary>
/// Store the message offset when manual store option is used
/// Gets an instance of IDependencyResolver which provides methods to resolve dependencies.
/// This instance is tied to the consumer scope, meaning it is capable of resolving dependencies
/// that are scoped to the lifecycle of a single consumer.
/// </summary>
IDependencyResolver ConsumerDependencyResolver { get; }

/// <summary>
/// Gets an instance of IDependencyResolver which provides methods to resolve dependencies.
/// This instance is tied to the worker scope, meaning it is capable of resolving dependencies
/// that are scoped to the lifecycle of a single worker.
/// </summary>
IDependencyResolver WorkerDependencyResolver { get; }

/// <summary>
/// Gets a Task that completes when the <see cref="Complete"/> method is invoked,
/// indicating the end of message processing. This allows async operations
/// to wait for the message to be fully processed and its offset stored.
/// </summary>
Task<TopicPartitionOffset> Completion { get; }

/// <summary>
/// Signals the completion of message processing and stores the message offset to eventually be committed.
/// After this call, the framework marks the message processing as finished and releases resources associated with the message.
/// By default, this method is automatically invoked when message processing concludes, unless
/// the consumer is configured for manual message completion or the <see cref="AutoMessageCompletion"/> flag is set to false.
/// </summary>
void StoreOffset();
void Complete();

/// <summary>
/// Get offset watermark data
Expand Down
4 changes: 2 additions & 2 deletions src/KafkaFlow.Abstractions/IDateTimeProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ namespace KafkaFlow
/// </summary>
public interface IDateTimeProvider
{
/// <inheritdoc cref="DateTime.Now"/>
DateTime Now { get; }
/// <inheritdoc cref="DateTime.UtcNow"/>
DateTime UtcNow { get; }

/// <inheritdoc cref="DateTime.MinValue"/>
DateTime MinValue { get; }
Expand Down
32 changes: 32 additions & 0 deletions src/KafkaFlow.Abstractions/IEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
namespace KafkaFlow
{
using System;
using System.Threading.Tasks;

/// <summary>
/// Represents an Event to be subscribed.
/// </summary>
public interface IEvent
{
/// <summary>
/// Subscribes to the event.
/// </summary>
/// <param name="handler">The handler to be called when the event is fired.</param>
/// <returns>Event subscription reference</returns>
IEventSubscription Subscribe(Func<Task> handler);
}

/// <summary>
/// Represents an Event to be subscribed.
/// </summary>
/// <typeparam name="TArg">The argument expected by the event.</typeparam>
public interface IEvent<TArg>
{
/// <summary>
/// Subscribes to the event.
/// </summary>
/// <param name="handler">The handler to be called when the event is fired.</param>
/// <returns>Event subscription reference</returns>
IEventSubscription Subscribe(Func<TArg, Task> handler);
}
}
12 changes: 12 additions & 0 deletions src/KafkaFlow.Abstractions/IEventSubscription.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace KafkaFlow;

/// <summary>
/// Represents an Event subscription.
/// </summary>
public interface IEventSubscription
{
/// <summary>
/// Cancels the subscription to the event.
/// </summary>
void Cancel();
}
17 changes: 7 additions & 10 deletions src/KafkaFlow.Abstractions/IMessageContext.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace KafkaFlow
{
using System;

/// <summary>
/// A context that contains the message and metadata
/// </summary>
Expand All @@ -27,20 +25,19 @@ public interface IMessageContext
/// </summary>
IProducerContext ProducerContext { get; }

/// <summary>
/// Gets an instance of IDependencyResolver which provides methods to resolve dependencies.
/// This instance is tied to the message scope, meaning it is capable of resolving dependencies
/// that are scoped to the lifecycle of a single processed message.
/// </summary>
IDependencyResolver DependencyResolver { get; }

/// <summary>
/// Creates a new <see cref="IMessageContext"/> with the new message
/// </summary>
/// <param name="key">The new message key</param>
/// <param name="value">The new message value</param>
/// <returns>A new message context containing the new values</returns>
IMessageContext SetMessage(object key, object value);

/// <summary>
/// Deprecated
/// </summary>
/// <param name="message">key</param>
/// <returns></returns>
[Obsolete("This method should no longer be used, use the " + nameof(SetMessage) + "() instead.", true)]
IMessageContext TransformMessage(object message);
}
}
40 changes: 23 additions & 17 deletions src/KafkaFlow.Abstractions/IProducerContext.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,29 @@
namespace KafkaFlow
namespace KafkaFlow;

/// <summary>
/// Some producer metadata
/// </summary>
public interface IProducerContext
{
/// <summary>
/// Some producer metadata
/// Gets the topic associated with the message
/// </summary>
string Topic { get; }

/// <summary>
/// Gets the partition associated with the message
/// </summary>
public interface IProducerContext
{
/// <summary>
/// Gets the topic associated with the message
/// </summary>
string Topic { get; }
int? Partition { get; }

/// <summary>
/// Gets the partition associated with the message
/// </summary>
int? Partition { get; }
/// <summary>
/// Gets the partition offset associated with the message
/// </summary>
long? Offset { get; }

/// <summary>
/// Gets the partition offset associated with the message
/// </summary>
long? Offset { get; }
}
/// <summary>
/// Gets an instance of IDependencyResolver which provides methods to resolve dependencies.
/// This instance is tied to the producer scope, meaning it is capable of resolving dependencies
/// that are scoped to the lifecycle of a single producer.
/// </summary>
IDependencyResolver DependencyResolver { get; }
}
21 changes: 0 additions & 21 deletions src/KafkaFlow.Abstractions/IWorker.cs

This file was deleted.

Loading

0 comments on commit 5a85aa1

Please sign in to comment.