diff --git a/samples/KafkaFlow.Retry.API.Sample/Program.cs b/samples/KafkaFlow.Retry.API.Sample/Program.cs index 5dc1cfc7..b2cf4f02 100644 --- a/samples/KafkaFlow.Retry.API.Sample/Program.cs +++ b/samples/KafkaFlow.Retry.API.Sample/Program.cs @@ -10,10 +10,9 @@ public static void Main(string[] args) CreateHostBuilder(args).Build().Run(); } - public static IHostBuilder CreateHostBuilder(string[] args) => - Host.CreateDefaultBuilder(args) - .ConfigureWebHostDefaults(webBuilder => - { - webBuilder.UseStartup(); - }); + public static IHostBuilder CreateHostBuilder(string[] args) + { + return Host.CreateDefaultBuilder(args) + .ConfigureWebHostDefaults(webBuilder => { webBuilder.UseStartup(); }); + } } \ No newline at end of file diff --git a/samples/KafkaFlow.Retry.API.Sample/Startup.cs b/samples/KafkaFlow.Retry.API.Sample/Startup.cs index 852b4d1a..1460852b 100644 --- a/samples/KafkaFlow.Retry.API.Sample/Startup.cs +++ b/samples/KafkaFlow.Retry.API.Sample/Startup.cs @@ -11,49 +11,46 @@ public class Startup { public Startup(IConfiguration configuration) { - Configuration = configuration; - } + Configuration = configuration; + } public IConfiguration Configuration { get; } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { - if (env.IsDevelopment()) - { - app.UseDeveloperExceptionPage(); - } + if (env.IsDevelopment()) + { + app.UseDeveloperExceptionPage(); + } - app.UseHttpsRedirection(); + app.UseHttpsRedirection(); - app.UseRouting(); + app.UseRouting(); - app.UseAuthorization(); + app.UseAuthorization(); - app.UseKafkaFlowRetryEndpoints(); + app.UseKafkaFlowRetryEndpoints(); - app.UseEndpoints(endpoints => - { - endpoints.MapControllers(); - }); - } + app.UseEndpoints(endpoints => { endpoints.MapControllers(); }); + } // This method gets called by the runtime. Use this method to add services to the container. public void ConfigureServices(IServiceCollection services) { - services.AddSingleton(sp => - new MongoDbDataProviderFactory() - .TryCreate( - new MongoDbSettings - { - ConnectionString = "mongodb://localhost:27017/SVC_KAFKA_FLOW_RETRY_DURABLE", - DatabaseName = "SVC_KAFKA_FLOW_RETRY_DURABLE", - RetryQueueCollectionName = "RetryQueues", - RetryQueueItemCollectionName = "RetryQueueItems" - } - ).Result - ); - - services.AddControllers(); - } + services.AddSingleton(sp => + new MongoDbDataProviderFactory() + .TryCreate( + new MongoDbSettings + { + ConnectionString = "mongodb://localhost:27017/SVC_KAFKA_FLOW_RETRY_DURABLE", + DatabaseName = "SVC_KAFKA_FLOW_RETRY_DURABLE", + RetryQueueCollectionName = "RetryQueues", + RetryQueueItemCollectionName = "RetryQueueItems" + } + ).Result + ); + + services.AddControllers(); + } } \ No newline at end of file diff --git a/samples/KafkaFlow.Retry.Common.Sample/Helpers/KafkaHelper.cs b/samples/KafkaFlow.Retry.Common.Sample/Helpers/KafkaHelper.cs index cf7edc26..1c8bc83c 100644 --- a/samples/KafkaFlow.Retry.Common.Sample/Helpers/KafkaHelper.cs +++ b/samples/KafkaFlow.Retry.Common.Sample/Helpers/KafkaHelper.cs @@ -2,6 +2,8 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; +using Confluent.Kafka; +using Confluent.Kafka.Admin; namespace KafkaFlow.Retry.Common.Sample.Helpers; @@ -9,7 +11,8 @@ public static class KafkaHelper { public static async Task CreateKafkaTopics(string kafkaBrokers, string[] topics) { - using (var adminClient = new Confluent.Kafka.AdminClientBuilder(new Confluent.Kafka.AdminClientConfig { BootstrapServers = kafkaBrokers }).Build()) + using (var adminClient = + new AdminClientBuilder(new AdminClientConfig { BootstrapServers = kafkaBrokers }).Build()) { foreach (var topic in topics) { @@ -19,13 +22,14 @@ public static async Task CreateKafkaTopics(string kafkaBrokers, string[] topics) try { var deleteTopicRecords = new List(); - for (int i = 0; i < topicMetadata.Topics.First().Partitions.Count; i++) + for (var i = 0; i < topicMetadata.Topics.First().Partitions.Count; i++) { - deleteTopicRecords.Add(new Confluent.Kafka.TopicPartitionOffset(topic, i, Confluent.Kafka.Offset.End)); + deleteTopicRecords.Add(new Confluent.Kafka.TopicPartitionOffset(topic, i, Offset.End)); } + await adminClient.DeleteRecordsAsync(deleteTopicRecords).ConfigureAwait(false); } - catch (Confluent.Kafka.Admin.DeleteRecordsException e) + catch (DeleteRecordsException e) { Console.WriteLine($"An error occured deleting topic records: {e.Results[0].Error.Reason}"); } @@ -36,9 +40,9 @@ public static async Task CreateKafkaTopics(string kafkaBrokers, string[] topics) { await adminClient .CreatePartitionsAsync( - new List + new List { - new Confluent.Kafka.Admin.PartitionsSpecification + new() { Topic = topic, IncreaseTo = 6 @@ -46,9 +50,9 @@ await adminClient }) .ConfigureAwait(false); } - catch (Confluent.Kafka.Admin.CreateTopicsException e) + catch (CreateTopicsException e) { - if (e.Results[0].Error.Code != Confluent.Kafka.ErrorCode.UnknownTopicOrPart) + if (e.Results[0].Error.Code != ErrorCode.UnknownTopicOrPart) { Console.WriteLine($"An error occured creating a topic: {e.Results[0].Error.Reason}"); } diff --git a/samples/KafkaFlow.Retry.Common.Sample/Helpers/SqlServerHelper.cs b/samples/KafkaFlow.Retry.Common.Sample/Helpers/SqlServerHelper.cs index 0bb36d14..0656ac6b 100644 --- a/samples/KafkaFlow.Retry.Common.Sample/Helpers/SqlServerHelper.cs +++ b/samples/KafkaFlow.Retry.Common.Sample/Helpers/SqlServerHelper.cs @@ -1,9 +1,10 @@ -using System.Collections.Generic; -using Microsoft.Data.SqlClient; +using System; +using System.Collections.Generic; using System.IO; using System.Linq; using System.Reflection; using System.Threading.Tasks; +using Microsoft.Data.SqlClient; namespace KafkaFlow.Retry.Common.Sample.Helpers; @@ -11,19 +12,19 @@ public static class SqlServerHelper { public static async Task RecreateSqlSchema(string databaseName, string connectionString) { - using (SqlConnection openCon = new SqlConnection(connectionString)) + using (var openCon = new SqlConnection(connectionString)) { openCon.Open(); foreach (var script in GetScriptsForSchemaCreation()) { - string[] batches = script.Split(new[] { "GO\r\n", "GO\t", "GO\n" }, System.StringSplitOptions.RemoveEmptyEntries); + var batches = script.Split(new[] { "GO\r\n", "GO\t", "GO\n" }, StringSplitOptions.RemoveEmptyEntries); foreach (var batch in batches) { - string replacedBatch = batch.Replace("@dbname", databaseName); + var replacedBatch = batch.Replace("@dbname", databaseName); - using (SqlCommand queryCommand = new SqlCommand(replacedBatch)) + using (var queryCommand = new SqlCommand(replacedBatch)) { queryCommand.Connection = openCon; @@ -36,15 +37,15 @@ public static async Task RecreateSqlSchema(string databaseName, string connectio private static IEnumerable GetScriptsForSchemaCreation() { - Assembly sqlServerAssembly = Assembly.LoadFrom("KafkaFlow.Retry.SqlServer.dll"); + var sqlServerAssembly = Assembly.LoadFrom("KafkaFlow.Retry.SqlServer.dll"); return sqlServerAssembly .GetManifestResourceNames() .OrderBy(x => x) .Select(script => { - using (Stream s = sqlServerAssembly.GetManifestResourceStream(script)) + using (var s = sqlServerAssembly.GetManifestResourceStream(script)) { - using (StreamReader sr = new StreamReader(s)) + using (var sr = new StreamReader(s)) { return sr.ReadToEnd(); } diff --git a/samples/KafkaFlow.Retry.Sample/Exceptions/RetryDurableTestException.cs b/samples/KafkaFlow.Retry.Sample/Exceptions/RetryDurableTestException.cs index f2542217..50c095b8 100644 --- a/samples/KafkaFlow.Retry.Sample/Exceptions/RetryDurableTestException.cs +++ b/samples/KafkaFlow.Retry.Sample/Exceptions/RetryDurableTestException.cs @@ -5,5 +5,6 @@ namespace KafkaFlow.Retry.Sample.Exceptions; public class RetryDurableTestException : Exception { public RetryDurableTestException(string message) : base(message) - { } + { + } } \ No newline at end of file diff --git a/samples/KafkaFlow.Retry.Sample/Handlers/RetryDurableTestHandler.cs b/samples/KafkaFlow.Retry.Sample/Handlers/RetryDurableTestHandler.cs index d86a14bb..c178578a 100644 --- a/samples/KafkaFlow.Retry.Sample/Handlers/RetryDurableTestHandler.cs +++ b/samples/KafkaFlow.Retry.Sample/Handlers/RetryDurableTestHandler.cs @@ -1,6 +1,5 @@ using System; using System.Threading.Tasks; -using KafkaFlow; using KafkaFlow.Retry.Sample.Exceptions; using KafkaFlow.Retry.Sample.Messages; diff --git a/samples/KafkaFlow.Retry.Sample/Handlers/RetryForeverTestHandler.cs b/samples/KafkaFlow.Retry.Sample/Handlers/RetryForeverTestHandler.cs index 12cabaa6..a3d06eb6 100644 --- a/samples/KafkaFlow.Retry.Sample/Handlers/RetryForeverTestHandler.cs +++ b/samples/KafkaFlow.Retry.Sample/Handlers/RetryForeverTestHandler.cs @@ -1,6 +1,5 @@ using System; using System.Threading.Tasks; -using KafkaFlow; using KafkaFlow.Retry.Sample.Exceptions; using KafkaFlow.Retry.Sample.Messages; diff --git a/samples/KafkaFlow.Retry.Sample/Handlers/RetrySimpleTestHandler.cs b/samples/KafkaFlow.Retry.Sample/Handlers/RetrySimpleTestHandler.cs index 039582d6..ed5929df 100644 --- a/samples/KafkaFlow.Retry.Sample/Handlers/RetrySimpleTestHandler.cs +++ b/samples/KafkaFlow.Retry.Sample/Handlers/RetrySimpleTestHandler.cs @@ -1,6 +1,5 @@ using System; using System.Threading.Tasks; -using KafkaFlow; using KafkaFlow.Retry.Sample.Exceptions; using KafkaFlow.Retry.Sample.Messages; diff --git a/samples/KafkaFlow.Retry.Sample/Helpers/KafkaClusterConfigurationBuilderHelper.cs b/samples/KafkaFlow.Retry.Sample/Helpers/KafkaClusterConfigurationBuilderHelper.cs index 9518a331..cbdde234 100644 --- a/samples/KafkaFlow.Retry.Sample/Helpers/KafkaClusterConfigurationBuilderHelper.cs +++ b/samples/KafkaFlow.Retry.Sample/Helpers/KafkaClusterConfigurationBuilderHelper.cs @@ -1,4 +1,5 @@ using System; +using Confluent.Kafka; using KafkaFlow.Configuration; using KafkaFlow.Retry.MongoDb; using KafkaFlow.Retry.Sample.Exceptions; @@ -18,263 +19,264 @@ internal static IClusterConfigurationBuilder SetupRetryDurableMongoDb( string mongoDbRetryQueueCollectionName, string mongoDbRetryQueueItemCollectionName) { - cluster - .AddProducer( - "kafka-flow-retry-durable-mongodb-producer", - producer => producer - .DefaultTopic("sample-kafka-flow-retry-durable-mongodb-topic") - .WithCompression(Confluent.Kafka.CompressionType.Gzip) - .AddMiddlewares( - middlewares => middlewares - .AddSerializer() - ) - .WithAcks(Acks.All) - ) - .AddConsumer( - consumer => consumer - .Topic("sample-kafka-flow-retry-durable-mongodb-topic") - .WithGroupId("sample-consumer-kafka-flow-retry-durable-mongodb") - .WithName("kafka-flow-retry-durable-mongodb-consumer") - .WithBufferSize(10) - .WithWorkersCount(20) - .WithAutoOffsetReset(AutoOffsetReset.Latest) - .AddMiddlewares( - middlewares => middlewares - .AddDeserializer() - .RetryDurable( - configure => configure - .Handle() - .WithMessageType(typeof(RetryDurableTestMessage)) - .WithMongoDbDataProvider( - mongoDbConnectionString, - mongoDbDatabaseName, - mongoDbRetryQueueCollectionName, - mongoDbRetryQueueItemCollectionName) - .WithRetryPlanBeforeRetryDurable( - configure => configure - .TryTimes(3) - .WithTimeBetweenTriesPlan( - TimeSpan.FromMilliseconds(250), - TimeSpan.FromMilliseconds(500), - TimeSpan.FromMilliseconds(1000)) - .ShouldPauseConsumer(false) - ) - .WithEmbeddedRetryCluster( - cluster, - configure => configure - .WithRetryTopicName("sample-kafka-flow-retry-durable-mongodb-topic-retry") - .WithRetryConsumerBufferSize(4) - .WithRetryConsumerWorkersCount(2) - .WithRetryConsumerStrategy(RetryConsumerStrategy.GuaranteeOrderedConsumption) - .WithRetryTypedHandlers( - handlers => handlers - .WithHandlerLifetime(InstanceLifetime.Transient) - .AddHandler() - ) - .Enabled(true) - ) - .WithPollingJobsConfiguration( - configure => configure - .WithSchedulerId("retry-durable-mongodb-polling-id") - .WithRetryDurablePollingConfiguration( - configure => configure - .WithCronExpression("0 0/1 * 1/1 * ? *") - .WithExpirationIntervalFactor(1) - .WithFetchSize(10) - .Enabled(true) - ) - .WithCleanupPollingConfiguration( - configure => configure - .WithCronExpression("0 0 * 1/1 * ? *") - .WithRowsPerRequest(1048) - .WithTimeToLiveInDays(60) - .Enabled(true) - ) - )) - .AddTypedHandlers( - handlers => handlers - .WithHandlerLifetime(InstanceLifetime.Transient) - .AddHandler()) - ) - ); + cluster + .AddProducer( + "kafka-flow-retry-durable-mongodb-producer", + producer => producer + .DefaultTopic("sample-kafka-flow-retry-durable-mongodb-topic") + .WithCompression(CompressionType.Gzip) + .AddMiddlewares( + middlewares => middlewares + .AddSerializer() + ) + .WithAcks(Acks.All) + ) + .AddConsumer( + consumer => consumer + .Topic("sample-kafka-flow-retry-durable-mongodb-topic") + .WithGroupId("sample-consumer-kafka-flow-retry-durable-mongodb") + .WithName("kafka-flow-retry-durable-mongodb-consumer") + .WithBufferSize(10) + .WithWorkersCount(20) + .WithAutoOffsetReset(AutoOffsetReset.Latest) + .AddMiddlewares( + middlewares => middlewares + .AddDeserializer() + .RetryDurable( + configure => configure + .Handle() + .WithMessageType(typeof(RetryDurableTestMessage)) + .WithMongoDbDataProvider( + mongoDbConnectionString, + mongoDbDatabaseName, + mongoDbRetryQueueCollectionName, + mongoDbRetryQueueItemCollectionName) + .WithRetryPlanBeforeRetryDurable( + configure => configure + .TryTimes(3) + .WithTimeBetweenTriesPlan( + TimeSpan.FromMilliseconds(250), + TimeSpan.FromMilliseconds(500), + TimeSpan.FromMilliseconds(1000)) + .ShouldPauseConsumer(false) + ) + .WithEmbeddedRetryCluster( + cluster, + configure => configure + .WithRetryTopicName("sample-kafka-flow-retry-durable-mongodb-topic-retry") + .WithRetryConsumerBufferSize(4) + .WithRetryConsumerWorkersCount(2) + .WithRetryConsumerStrategy( + RetryConsumerStrategy.GuaranteeOrderedConsumption) + .WithRetryTypedHandlers( + handlers => handlers + .WithHandlerLifetime(InstanceLifetime.Transient) + .AddHandler() + ) + .Enabled(true) + ) + .WithPollingJobsConfiguration( + configure => configure + .WithSchedulerId("retry-durable-mongodb-polling-id") + .WithRetryDurablePollingConfiguration( + configure => configure + .WithCronExpression("0 0/1 * 1/1 * ? *") + .WithExpirationIntervalFactor(1) + .WithFetchSize(10) + .Enabled(true) + ) + .WithCleanupPollingConfiguration( + configure => configure + .WithCronExpression("0 0 * 1/1 * ? *") + .WithRowsPerRequest(1048) + .WithTimeToLiveInDays(60) + .Enabled(true) + ) + )) + .AddTypedHandlers( + handlers => handlers + .WithHandlerLifetime(InstanceLifetime.Transient) + .AddHandler()) + ) + ); - return cluster; - } + return cluster; + } internal static IClusterConfigurationBuilder SetupRetryDurableSqlServer( this IClusterConfigurationBuilder cluster, string sqlServerConnectionString, string sqlServerDatabaseName) { - cluster - .AddProducer( - "kafka-flow-retry-durable-sqlserver-producer", - producer => producer - .DefaultTopic("sample-kafka-flow-retry-durable-sqlserver-topic") - .WithCompression(Confluent.Kafka.CompressionType.Gzip) - .AddMiddlewares( - middlewares => middlewares - .AddSerializer() - ) - .WithAcks(Acks.All) - ) - .AddConsumer( - consumer => consumer - .Topic("sample-kafka-flow-retry-durable-sqlserver-topic") - .WithGroupId("sample-consumer-kafka-flow-retry-durable-sqlserver") - .WithName("kafka-flow-retry-durable-sqlserver-consumer") - .WithBufferSize(10) - .WithWorkersCount(20) - .WithAutoOffsetReset(AutoOffsetReset.Latest) - .AddMiddlewares( - middlewares => middlewares - .AddDeserializer() - .RetryDurable( - configure => configure - .Handle() - .WithMessageType(typeof(RetryDurableTestMessage)) - .WithSqlServerDataProvider( - sqlServerConnectionString, - sqlServerDatabaseName) - .WithRetryPlanBeforeRetryDurable( - configure => configure - .TryTimes(3) - .WithTimeBetweenTriesPlan( - TimeSpan.FromMilliseconds(250), - TimeSpan.FromMilliseconds(500), - TimeSpan.FromMilliseconds(1000)) - .ShouldPauseConsumer(false) - ) - .WithEmbeddedRetryCluster( - cluster, - configure => configure - .WithRetryTopicName("sample-kafka-flow-retry-durable-sqlserver-topic-retry") - .WithRetryConsumerBufferSize(4) - .WithRetryConsumerWorkersCount(2) - .WithRetryConsumerStrategy(RetryConsumerStrategy.LatestConsumption) - .WithRetryTypedHandlers( - handlers => handlers - .WithHandlerLifetime(InstanceLifetime.Transient) - .AddHandler() - ) - .Enabled(true) - ) - .WithPollingJobsConfiguration( - configure => configure - .WithSchedulerId("retry-durable-sqlserver-polling-id") - .WithRetryDurablePollingConfiguration( - configure => configure - .WithCronExpression("0 0/1 * 1/1 * ? *") - .WithExpirationIntervalFactor(1) - .WithFetchSize(10) - .Enabled(true) - ) - .WithCleanupPollingConfiguration( - configure => configure - .Enabled(false) - ) - )) - .AddTypedHandlers( - handlers => handlers - .WithHandlerLifetime(InstanceLifetime.Transient) - .AddHandler()) - ) - ); + cluster + .AddProducer( + "kafka-flow-retry-durable-sqlserver-producer", + producer => producer + .DefaultTopic("sample-kafka-flow-retry-durable-sqlserver-topic") + .WithCompression(CompressionType.Gzip) + .AddMiddlewares( + middlewares => middlewares + .AddSerializer() + ) + .WithAcks(Acks.All) + ) + .AddConsumer( + consumer => consumer + .Topic("sample-kafka-flow-retry-durable-sqlserver-topic") + .WithGroupId("sample-consumer-kafka-flow-retry-durable-sqlserver") + .WithName("kafka-flow-retry-durable-sqlserver-consumer") + .WithBufferSize(10) + .WithWorkersCount(20) + .WithAutoOffsetReset(AutoOffsetReset.Latest) + .AddMiddlewares( + middlewares => middlewares + .AddDeserializer() + .RetryDurable( + configure => configure + .Handle() + .WithMessageType(typeof(RetryDurableTestMessage)) + .WithSqlServerDataProvider( + sqlServerConnectionString, + sqlServerDatabaseName) + .WithRetryPlanBeforeRetryDurable( + configure => configure + .TryTimes(3) + .WithTimeBetweenTriesPlan( + TimeSpan.FromMilliseconds(250), + TimeSpan.FromMilliseconds(500), + TimeSpan.FromMilliseconds(1000)) + .ShouldPauseConsumer(false) + ) + .WithEmbeddedRetryCluster( + cluster, + configure => configure + .WithRetryTopicName("sample-kafka-flow-retry-durable-sqlserver-topic-retry") + .WithRetryConsumerBufferSize(4) + .WithRetryConsumerWorkersCount(2) + .WithRetryConsumerStrategy(RetryConsumerStrategy.LatestConsumption) + .WithRetryTypedHandlers( + handlers => handlers + .WithHandlerLifetime(InstanceLifetime.Transient) + .AddHandler() + ) + .Enabled(true) + ) + .WithPollingJobsConfiguration( + configure => configure + .WithSchedulerId("retry-durable-sqlserver-polling-id") + .WithRetryDurablePollingConfiguration( + configure => configure + .WithCronExpression("0 0/1 * 1/1 * ? *") + .WithExpirationIntervalFactor(1) + .WithFetchSize(10) + .Enabled(true) + ) + .WithCleanupPollingConfiguration( + configure => configure + .Enabled(false) + ) + )) + .AddTypedHandlers( + handlers => handlers + .WithHandlerLifetime(InstanceLifetime.Transient) + .AddHandler()) + ) + ); - return cluster; - } + return cluster; + } internal static IClusterConfigurationBuilder SetupRetryForever(this IClusterConfigurationBuilder cluster) { - cluster - .AddProducer( - "kafka-flow-retry-forever-producer", - producer => producer - .DefaultTopic("sample-kafka-flow-retry-forever-topic") - .WithCompression(Confluent.Kafka.CompressionType.Gzip) - .AddMiddlewares( - middlewares => middlewares - .AddSerializer() - ) - .WithAcks(Acks.All) - ) - .AddConsumer( - consumer => consumer - .Topic("sample-kafka-flow-retry-forever-topic") - .WithGroupId("sample-consumer-kafka-flow-retry-forever") - .WithName("kafka-flow-retry-forever-consumer") - .WithBufferSize(10) - .WithWorkersCount(20) - .WithAutoOffsetReset(AutoOffsetReset.Latest) - .AddMiddlewares( - middlewares => middlewares - .AddDeserializer() - .RetryForever( - (configure) => configure - .Handle() - .WithTimeBetweenTriesPlan( - TimeSpan.FromMilliseconds(500), - TimeSpan.FromMilliseconds(1000)) - ) - .AddTypedHandlers( - handlers => handlers - .WithHandlerLifetime(InstanceLifetime.Transient) - .AddHandler()) - ) - ); + cluster + .AddProducer( + "kafka-flow-retry-forever-producer", + producer => producer + .DefaultTopic("sample-kafka-flow-retry-forever-topic") + .WithCompression(CompressionType.Gzip) + .AddMiddlewares( + middlewares => middlewares + .AddSerializer() + ) + .WithAcks(Acks.All) + ) + .AddConsumer( + consumer => consumer + .Topic("sample-kafka-flow-retry-forever-topic") + .WithGroupId("sample-consumer-kafka-flow-retry-forever") + .WithName("kafka-flow-retry-forever-consumer") + .WithBufferSize(10) + .WithWorkersCount(20) + .WithAutoOffsetReset(AutoOffsetReset.Latest) + .AddMiddlewares( + middlewares => middlewares + .AddDeserializer() + .RetryForever( + configure => configure + .Handle() + .WithTimeBetweenTriesPlan( + TimeSpan.FromMilliseconds(500), + TimeSpan.FromMilliseconds(1000)) + ) + .AddTypedHandlers( + handlers => handlers + .WithHandlerLifetime(InstanceLifetime.Transient) + .AddHandler()) + ) + ); - return cluster; - } + return cluster; + } internal static IClusterConfigurationBuilder SetupRetrySimple(this IClusterConfigurationBuilder cluster) { - cluster - .AddProducer( - "kafka-flow-retry-simple-producer", - producer => producer - .DefaultTopic("sample-kafka-flow-retry-simple-topic") - .WithCompression(Confluent.Kafka.CompressionType.Gzip) - .AddMiddlewares( - middlewares => middlewares - .AddSerializer() - ) - .WithAcks(Acks.All) - ) - .AddConsumer( - consumer => consumer - .Topic("sample-kafka-flow-retry-simple-topic") - .WithGroupId("sample-consumer-kafka-flow-retry-simple") - .WithName("kafka-flow-retry-simple-consumer") - .WithBufferSize(10) - .WithWorkersCount(20) - .WithAutoOffsetReset(AutoOffsetReset.Latest) - .AddMiddlewares( - middlewares => middlewares - .AddDeserializer() - .RetrySimple( - (configure) => configure - .Handle() - .TryTimes(2) - .WithTimeBetweenTriesPlan((retryCount) => + cluster + .AddProducer( + "kafka-flow-retry-simple-producer", + producer => producer + .DefaultTopic("sample-kafka-flow-retry-simple-topic") + .WithCompression(CompressionType.Gzip) + .AddMiddlewares( + middlewares => middlewares + .AddSerializer() + ) + .WithAcks(Acks.All) + ) + .AddConsumer( + consumer => consumer + .Topic("sample-kafka-flow-retry-simple-topic") + .WithGroupId("sample-consumer-kafka-flow-retry-simple") + .WithName("kafka-flow-retry-simple-consumer") + .WithBufferSize(10) + .WithWorkersCount(20) + .WithAutoOffsetReset(AutoOffsetReset.Latest) + .AddMiddlewares( + middlewares => middlewares + .AddDeserializer() + .RetrySimple( + configure => configure + .Handle() + .TryTimes(2) + .WithTimeBetweenTriesPlan(retryCount => + { + var plan = new[] { - var plan = new[] - { TimeSpan.FromMilliseconds(1500), TimeSpan.FromMilliseconds(2000), TimeSpan.FromMilliseconds(2000) - }; + }; - return plan[retryCount]; - }) - .ShouldPauseConsumer(false) - ) - .AddTypedHandlers( - handlers => handlers - .WithHandlerLifetime(InstanceLifetime.Transient) - .AddHandler()) - ) - ); + return plan[retryCount]; + }) + .ShouldPauseConsumer(false) + ) + .AddTypedHandlers( + handlers => handlers + .WithHandlerLifetime(InstanceLifetime.Transient) + .AddHandler()) + ) + ); - return cluster; - } + return cluster; + } } \ No newline at end of file diff --git a/samples/KafkaFlow.Retry.Sample/Messages/RetryDurableTestMessage.cs b/samples/KafkaFlow.Retry.Sample/Messages/RetryDurableTestMessage.cs index e70b8619..23069989 100644 --- a/samples/KafkaFlow.Retry.Sample/Messages/RetryDurableTestMessage.cs +++ b/samples/KafkaFlow.Retry.Sample/Messages/RetryDurableTestMessage.cs @@ -5,6 +5,5 @@ namespace KafkaFlow.Retry.Sample.Messages; [DataContract] public class RetryDurableTestMessage { - [DataMember(Order = 1)] - public string Text { get; set; } + [DataMember(Order = 1)] public string Text { get; set; } } \ No newline at end of file diff --git a/samples/KafkaFlow.Retry.Sample/Messages/RetryForeverTestMessage.cs b/samples/KafkaFlow.Retry.Sample/Messages/RetryForeverTestMessage.cs index 5b972b12..705c454d 100644 --- a/samples/KafkaFlow.Retry.Sample/Messages/RetryForeverTestMessage.cs +++ b/samples/KafkaFlow.Retry.Sample/Messages/RetryForeverTestMessage.cs @@ -5,6 +5,5 @@ namespace KafkaFlow.Retry.Sample.Messages; [DataContract] public class RetryForeverTestMessage { - [DataMember(Order = 1)] - public string Text { get; set; } + [DataMember(Order = 1)] public string Text { get; set; } } \ No newline at end of file diff --git a/samples/KafkaFlow.Retry.Sample/Program.cs b/samples/KafkaFlow.Retry.Sample/Program.cs index 1aaba0b7..d97cfec8 100644 --- a/samples/KafkaFlow.Retry.Sample/Program.cs +++ b/samples/KafkaFlow.Retry.Sample/Program.cs @@ -3,11 +3,11 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; -using Microsoft.Extensions.DependencyInjection; using KafkaFlow.Producers; using KafkaFlow.Retry.Common.Sample.Helpers; using KafkaFlow.Retry.Sample.Helpers; using KafkaFlow.Retry.Sample.Messages; +using Microsoft.Extensions.DependencyInjection; namespace KafkaFlow.Retry.Sample; @@ -21,7 +21,8 @@ private static async Task Main() var mongoDbDatabaseName = "kafka_flow_retry_durable_sample"; var mongoDbRetryQueueCollectionName = "RetryQueues"; var mongoDbRetryQueueItemCollectionName = "RetryQueueItems"; - var sqlServerConnectionString = "Server=localhost;Trusted_Connection=True; Pooling=true; Min Pool Size=1; Max Pool Size=100; MultipleActiveResultSets=true; Application Name=KafkaFlow Retry Sample"; + var sqlServerConnectionString = + "Server=localhost;Trusted_Connection=True; Pooling=true; Min Pool Size=1; Max Pool Size=100; MultipleActiveResultSets=true; Application Name=KafkaFlow Retry Sample"; var sqlServerDatabaseName = "kafka_flow_retry_durable_sample"; var topics = new[] { @@ -30,7 +31,7 @@ private static async Task Main() "sample-kafka-flow-retry-durable-sqlserver-topic", "sample-kafka-flow-retry-durable-sqlserver-topic-retry", "sample-kafka-flow-retry-durable-mongodb-topic", - "sample-kafka-flow-retry-durable-mongodb-topic-retry", + "sample-kafka-flow-retry-durable-mongodb-topic-retry" }; SqlServerHelper.RecreateSqlSchema(sqlServerDatabaseName, sqlServerConnectionString).GetAwaiter().GetResult(); @@ -182,7 +183,8 @@ await producers["kafka-flow-retry-simple-producer"] break; default: - Console.Write("USE: retry-simple, retry-forever, retry-durable-mongodb, retry-durable-sqlserver or exit: "); + Console.Write( + "USE: retry-simple, retry-forever, retry-durable-mongodb, retry-durable-sqlserver or exit: "); break; } } diff --git a/samples/KafkaFlow.Retry.SchemaRegistry.Sample/ContractResolvers/WritablePropertiesOnlyResolver.cs b/samples/KafkaFlow.Retry.SchemaRegistry.Sample/ContractResolvers/WritablePropertiesOnlyResolver.cs index 7320c7ff..dca61a68 100644 --- a/samples/KafkaFlow.Retry.SchemaRegistry.Sample/ContractResolvers/WritablePropertiesOnlyResolver.cs +++ b/samples/KafkaFlow.Retry.SchemaRegistry.Sample/ContractResolvers/WritablePropertiesOnlyResolver.cs @@ -10,7 +10,7 @@ internal class WritablePropertiesOnlyResolver : DefaultContractResolver { protected override IList CreateProperties(Type type, MemberSerialization memberSerialization) { - IList props = base.CreateProperties(type, memberSerialization); + var props = base.CreateProperties(type, memberSerialization); return props.Where(p => p.Writable).ToList(); } } \ No newline at end of file diff --git a/samples/KafkaFlow.Retry.SchemaRegistry.Sample/Exceptions/RetryDurableTestException.cs b/samples/KafkaFlow.Retry.SchemaRegistry.Sample/Exceptions/RetryDurableTestException.cs index 00376ba0..1a2b2cd0 100644 --- a/samples/KafkaFlow.Retry.SchemaRegistry.Sample/Exceptions/RetryDurableTestException.cs +++ b/samples/KafkaFlow.Retry.SchemaRegistry.Sample/Exceptions/RetryDurableTestException.cs @@ -5,5 +5,6 @@ namespace KafkaFlow.Retry.SchemaRegistry.Sample.Exceptions; public class RetryDurableTestException : Exception { public RetryDurableTestException(string message) : base(message) - { } + { + } } \ No newline at end of file diff --git a/samples/KafkaFlow.Retry.SchemaRegistry.Sample/Handlers/AvroMessageTestHandler.cs b/samples/KafkaFlow.Retry.SchemaRegistry.Sample/Handlers/AvroMessageTestHandler.cs index d69f92b7..56b7edb8 100644 --- a/samples/KafkaFlow.Retry.SchemaRegistry.Sample/Handlers/AvroMessageTestHandler.cs +++ b/samples/KafkaFlow.Retry.SchemaRegistry.Sample/Handlers/AvroMessageTestHandler.cs @@ -1,7 +1,7 @@ using System; using System.Threading.Tasks; -using SchemaRegistry; using KafkaFlow.Retry.SchemaRegistry.Sample.Exceptions; +using SchemaRegistry; namespace KafkaFlow.Retry.SchemaRegistry.Sample.Handlers; diff --git a/samples/KafkaFlow.Retry.SchemaRegistry.Sample/Helpers/KafkaClusterConfigurationBuilderHelper.cs b/samples/KafkaFlow.Retry.SchemaRegistry.Sample/Helpers/KafkaClusterConfigurationBuilderHelper.cs index 6f67fcaf..336e20e0 100644 --- a/samples/KafkaFlow.Retry.SchemaRegistry.Sample/Helpers/KafkaClusterConfigurationBuilderHelper.cs +++ b/samples/KafkaFlow.Retry.SchemaRegistry.Sample/Helpers/KafkaClusterConfigurationBuilderHelper.cs @@ -1,13 +1,14 @@ using System; +using Confluent.Kafka; using Confluent.SchemaRegistry; using Confluent.SchemaRegistry.Serdes; -using SchemaRegistry; using KafkaFlow.Configuration; using KafkaFlow.Retry.MongoDb; using KafkaFlow.Retry.SchemaRegistry.Sample.ContractResolvers; using KafkaFlow.Retry.SchemaRegistry.Sample.Exceptions; using KafkaFlow.Retry.SchemaRegistry.Sample.Handlers; using Newtonsoft.Json; +using SchemaRegistry; namespace KafkaFlow.Retry.SchemaRegistry.Sample.Helpers; @@ -25,7 +26,7 @@ internal static IClusterConfigurationBuilder SetupRetryDurableMongoAvroDb( "kafka-flow-retry-durable-mongodb-avro-producer", producer => producer .DefaultTopic("sample-kafka-flow-retry-durable-mongodb-avro-topic") - .WithCompression(Confluent.Kafka.CompressionType.Gzip) + .WithCompression(CompressionType.Gzip) .AddMiddlewares( middlewares => middlewares .AddSchemaRegistryAvroSerializer( @@ -72,10 +73,12 @@ internal static IClusterConfigurationBuilder SetupRetryDurableMongoAvroDb( .WithEmbeddedRetryCluster( cluster, configure => configure - .WithRetryTopicName("sample-kafka-flow-retry-durable-mongodb-avro-topic-retry") + .WithRetryTopicName( + "sample-kafka-flow-retry-durable-mongodb-avro-topic-retry") .WithRetryConsumerBufferSize(4) .WithRetryConsumerWorkersCount(2) - .WithRetryConsumerStrategy(RetryConsumerStrategy.GuaranteeOrderedConsumption) + .WithRetryConsumerStrategy( + RetryConsumerStrategy.GuaranteeOrderedConsumption) .WithRetryTypedHandlers( handlers => handlers .WithHandlerLifetime(InstanceLifetime.Transient) diff --git a/samples/KafkaFlow.Retry.SchemaRegistry.Sample/Program.cs b/samples/KafkaFlow.Retry.SchemaRegistry.Sample/Program.cs index 88026d92..eb9ea6ef 100644 --- a/samples/KafkaFlow.Retry.SchemaRegistry.Sample/Program.cs +++ b/samples/KafkaFlow.Retry.SchemaRegistry.Sample/Program.cs @@ -1,11 +1,11 @@ using System; using System.Threading; using System.Threading.Tasks; -using SchemaRegistry; using KafkaFlow.Producers; using KafkaFlow.Retry.Common.Sample.Helpers; using KafkaFlow.Retry.SchemaRegistry.Sample.Helpers; using Microsoft.Extensions.DependencyInjection; +using SchemaRegistry; namespace KafkaFlow.Retry.SchemaRegistry.Sample; @@ -23,7 +23,7 @@ private static async Task Main(string[] args) var topics = new[] { "sample-kafka-flow-retry-durable-mongodb-avro-topic", - "sample-kafka-flow-retry-durable-mongodb-avro-topic-retry", + "sample-kafka-flow-retry-durable-mongodb-avro-topic-retry" }; KafkaHelper.CreateKafkaTopics(brokers, topics).GetAwaiter().GetResult(); diff --git a/src/KafkaFlow.Retry.API/Adapters/Common/Parsers/EnumParser.cs b/src/KafkaFlow.Retry.API/Adapters/Common/Parsers/EnumParser.cs index 69d64e3c..c3cd0894 100644 --- a/src/KafkaFlow.Retry.API/Adapters/Common/Parsers/EnumParser.cs +++ b/src/KafkaFlow.Retry.API/Adapters/Common/Parsers/EnumParser.cs @@ -9,24 +9,24 @@ internal class EnumParser : IQueryParametersParser where T : struct { public IEnumerable Parse(IEnumerable parameters, IEnumerable defaultValue) { - Guard.Argument(parameters, nameof(parameters)).NotNull(); - Guard.Argument(defaultValue, nameof(defaultValue)).NotNull(); + Guard.Argument(parameters, nameof(parameters)).NotNull(); + Guard.Argument(defaultValue, nameof(defaultValue)).NotNull(); - var items = new List(); + var items = new List(); - if (parameters.Any()) + if (parameters.Any()) + { + foreach (var param in parameters) { - foreach (var param in parameters) + if (Enum.TryParse(param, out var item)) { - if (Enum.TryParse(param, out var item)) - { - items.Add(item); - } + items.Add(item); } - - return items; } - return defaultValue; + return items; } + + return defaultValue; + } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.API/Adapters/Common/RetryQueueItemAdapter.cs b/src/KafkaFlow.Retry.API/Adapters/Common/RetryQueueItemAdapter.cs index 26511658..7ec988b8 100644 --- a/src/KafkaFlow.Retry.API/Adapters/Common/RetryQueueItemAdapter.cs +++ b/src/KafkaFlow.Retry.API/Adapters/Common/RetryQueueItemAdapter.cs @@ -8,28 +8,28 @@ internal class RetryQueueItemAdapter : IRetryQueueItemAdapter { public RetryQueueItemDto Adapt(RetryQueueItem item, string queueGroupKey) { - Guard.Argument(item, nameof(item)).NotNull(); - Guard.Argument(item.Message, nameof(item.Message)).NotNull(); + Guard.Argument(item, nameof(item)).NotNull(); + Guard.Argument(item.Message, nameof(item.Message)).NotNull(); - return new RetryQueueItemDto() + return new RetryQueueItemDto + { + Id = item.Id, + Status = item.Status, + SeverityLevel = item.SeverityLevel, + AttemptsCount = item.AttemptsCount, + CreationDate = item.CreationDate, + LastExecution = item.LastExecution, + Sort = item.Sort, + MessageInfo = new RetryQueuetItemMessageInfoDto { - Id = item.Id, - Status = item.Status, - SeverityLevel = item.SeverityLevel, - AttemptsCount = item.AttemptsCount, - CreationDate = item.CreationDate, - LastExecution = item.LastExecution, - Sort = item.Sort, - MessageInfo = new RetryQueuetItemMessageInfoDto() - { - Key = item.Message.Key, - Offset = item.Message.Offset, - Partition = item.Message.Partition, - Topic = item.Message.TopicName, - UtcTimeStamp = item.Message.UtcTimeStamp - }, - Description = item.Description, - QueueGroupKey = queueGroupKey - }; - } + Key = item.Message.Key, + Offset = item.Message.Offset, + Partition = item.Message.Partition, + Topic = item.Message.TopicName, + UtcTimeStamp = item.Message.UtcTimeStamp + }, + Description = item.Description, + QueueGroupKey = queueGroupKey + }; + } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.API/Adapters/GetItems/GetItemsRequestDtoReader.cs b/src/KafkaFlow.Retry.API/Adapters/GetItems/GetItemsRequestDtoReader.cs index f196b87e..7e7adc15 100644 --- a/src/KafkaFlow.Retry.API/Adapters/GetItems/GetItemsRequestDtoReader.cs +++ b/src/KafkaFlow.Retry.API/Adapters/GetItems/GetItemsRequestDtoReader.cs @@ -12,7 +12,10 @@ internal class GetItemsRequestDtoReader : IGetItemsRequestDtoReader { private const int DefaultTopItemsByQueueValue = 100; private const int DefaultTopQueuesValue = 10000; - private readonly IEnumerable _defaultItemsStatuses = new RetryQueueItemStatus[] { RetryQueueItemStatus.Waiting, RetryQueueItemStatus.InRetry }; + + private readonly IEnumerable _defaultItemsStatuses = + new[] { RetryQueueItemStatus.Waiting, RetryQueueItemStatus.InRetry }; + private readonly IEnumerable _defaultSeverityLevels = Enumerable.Empty(); private readonly EnumParser _severitiesParser; @@ -31,12 +34,16 @@ public GetItemsRequestDto Read(HttpRequest request) var topQueues = request.ReadQueryParams("topqueues"); var topItemsByQueue = request.ReadQueryParams("topitemsbyqueue"); - return new GetItemsRequestDto() + return new GetItemsRequestDto { ItemsStatuses = _statusesParser.Parse(statusIds, _defaultItemsStatuses), SeverityLevels = _severitiesParser.Parse(severityIds, _defaultSeverityLevels), - TopQueues = int.TryParse(topQueues.LastOrDefault(), out int parsedTopQueues) ? parsedTopQueues : DefaultTopQueuesValue, - TopItemsByQueue = int.TryParse(topItemsByQueue.LastOrDefault(), out int parsedTopItemsByQueue) ? parsedTopItemsByQueue : DefaultTopItemsByQueueValue + TopQueues = int.TryParse(topQueues.LastOrDefault(), out var parsedTopQueues) + ? parsedTopQueues + : DefaultTopQueuesValue, + TopItemsByQueue = int.TryParse(topItemsByQueue.LastOrDefault(), out var parsedTopItemsByQueue) + ? parsedTopItemsByQueue + : DefaultTopItemsByQueueValue }; } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.API/Adapters/UpdateQueues/UpdateQueuesResponseDtoAdapter.cs b/src/KafkaFlow.Retry.API/Adapters/UpdateQueues/UpdateQueuesResponseDtoAdapter.cs index 457bcb3a..211d765d 100644 --- a/src/KafkaFlow.Retry.API/Adapters/UpdateQueues/UpdateQueuesResponseDtoAdapter.cs +++ b/src/KafkaFlow.Retry.API/Adapters/UpdateQueues/UpdateQueuesResponseDtoAdapter.cs @@ -14,7 +14,8 @@ public UpdateQueuesResponseDto Adapt(UpdateQueuesResult updateQueuesResult) foreach (var res in updateQueuesResult.Results) { - resultDto.UpdateQueuesResults.Add(new UpdateQueueResultDto(res.QueueGroupKey, res.Status, res.RetryQueueStatus)); + resultDto.UpdateQueuesResults.Add(new UpdateQueueResultDto(res.QueueGroupKey, res.Status, + res.RetryQueueStatus)); } return resultDto; diff --git a/src/KafkaFlow.Retry.API/Dtos/GetItemsResponseDto.cs b/src/KafkaFlow.Retry.API/Dtos/GetItemsResponseDto.cs index 6e650093..b6f2ed0c 100644 --- a/src/KafkaFlow.Retry.API/Dtos/GetItemsResponseDto.cs +++ b/src/KafkaFlow.Retry.API/Dtos/GetItemsResponseDto.cs @@ -2,12 +2,13 @@ using KafkaFlow.Retry.API.Dtos.Common; namespace KafkaFlow.Retry.API.Dtos; + public class GetItemsResponseDto { public GetItemsResponseDto(IEnumerable queueItemDtos) { - QueueItems = queueItemDtos; - } + QueueItems = queueItemDtos; + } public IEnumerable QueueItems { get; set; } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.API/Dtos/UpdateItemResultDto.cs b/src/KafkaFlow.Retry.API/Dtos/UpdateItemResultDto.cs index 75a50768..fc4120b8 100644 --- a/src/KafkaFlow.Retry.API/Dtos/UpdateItemResultDto.cs +++ b/src/KafkaFlow.Retry.API/Dtos/UpdateItemResultDto.cs @@ -7,9 +7,9 @@ public class UpdateItemResultDto { public UpdateItemResultDto(Guid itemId, UpdateItemResultStatus value) { - ItemId = itemId; - Result = value.ToString(); - } + ItemId = itemId; + Result = value.ToString(); + } public Guid ItemId { get; set; } diff --git a/src/KafkaFlow.Retry.API/Dtos/UpdateItemsResponseDto.cs b/src/KafkaFlow.Retry.API/Dtos/UpdateItemsResponseDto.cs index 0bdacc23..c325feea 100644 --- a/src/KafkaFlow.Retry.API/Dtos/UpdateItemsResponseDto.cs +++ b/src/KafkaFlow.Retry.API/Dtos/UpdateItemsResponseDto.cs @@ -6,8 +6,8 @@ public class UpdateItemsResponseDto { public UpdateItemsResponseDto() { - UpdateItemsResults = new List(); - } + UpdateItemsResults = new List(); + } public IList UpdateItemsResults { get; set; } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.API/Handlers/GetItemsHandler.cs b/src/KafkaFlow.Retry.API/Handlers/GetItemsHandler.cs index eae704ed..2a08e7d7 100644 --- a/src/KafkaFlow.Retry.API/Handlers/GetItemsHandler.cs +++ b/src/KafkaFlow.Retry.API/Handlers/GetItemsHandler.cs @@ -1,4 +1,5 @@ -using System.Net; +using System; +using System.Net; using System.Threading.Tasks; using Dawn; using KafkaFlow.Retry.API.Adapters.GetItems; @@ -48,7 +49,7 @@ protected override async Task HandleRequestAsync(HttpRequest request, HttpRespon await WriteResponseAsync(response, responseDto, (int)HttpStatusCode.OK).ConfigureAwait(false); } - catch (System.Exception ex) + catch (Exception ex) { await WriteResponseAsync(response, ex, (int)HttpStatusCode.InternalServerError).ConfigureAwait(false); } diff --git a/src/KafkaFlow.Retry.API/HttpExtensions.cs b/src/KafkaFlow.Retry.API/HttpExtensions.cs index 434ef7c5..4f79f4c8 100644 --- a/src/KafkaFlow.Retry.API/HttpExtensions.cs +++ b/src/KafkaFlow.Retry.API/HttpExtensions.cs @@ -1,5 +1,4 @@ -using System; -using System.Collections.Generic; +using System.Collections.Generic; using Microsoft.AspNetCore.Http; namespace KafkaFlow.Retry.API; @@ -17,7 +16,7 @@ public static void AddQueryParams(this HttpRequest httpRequest, string name, str public static string ExtendResourcePath(this string resource, string extension) { - return String.Concat(resource, ResourcePathDelimiter, extension); + return string.Concat(resource, ResourcePathDelimiter, extension); } public static IEnumerable ReadQueryParams(this HttpRequest httpRequest, string paramKey) diff --git a/src/KafkaFlow.Retry.API/RetryRequestHandlerBase.cs b/src/KafkaFlow.Retry.API/RetryRequestHandlerBase.cs index ae3cf04d..6cee44cd 100644 --- a/src/KafkaFlow.Retry.API/RetryRequestHandlerBase.cs +++ b/src/KafkaFlow.Retry.API/RetryRequestHandlerBase.cs @@ -9,89 +9,89 @@ namespace KafkaFlow.Retry.API; internal abstract class RetryRequestHandlerBase : IHttpRequestHandler { + private const string RetryResource = "retry"; private readonly string _path; - private const string RetryResource = "retry"; - protected JsonSerializerSettings JsonSerializerSettings = new JsonSerializerSettings() + protected JsonSerializerSettings JsonSerializerSettings = new() { DateTimeZoneHandling = DateTimeZoneHandling.Utc, TypeNameHandling = TypeNameHandling.None }; - protected abstract HttpMethod HttpMethod { get; } - protected RetryRequestHandlerBase(string endpointPrefix, string resource) { - Guard.Argument(resource, nameof(resource)).NotNull().NotEmpty(); - - if (!string.IsNullOrEmpty(endpointPrefix)) - { - _path = _path - .ExtendResourcePath(endpointPrefix); - } + Guard.Argument(resource, nameof(resource)).NotNull().NotEmpty(); + if (!string.IsNullOrEmpty(endpointPrefix)) + { _path = _path - .ExtendResourcePath(RetryResource) - .ExtendResourcePath(resource); + .ExtendResourcePath(endpointPrefix); } + _path = _path + .ExtendResourcePath(RetryResource) + .ExtendResourcePath(resource); + } + + protected abstract HttpMethod HttpMethod { get; } + public virtual async Task HandleAsync(HttpRequest request, HttpResponse response) { - if (!CanHandle(request)) - { - return false; - } + if (!CanHandle(request)) + { + return false; + } - await HandleRequestAsync(request, response).ConfigureAwait(false); + await HandleRequestAsync(request, response).ConfigureAwait(false); - return true; - } + return true; + } protected bool CanHandle(HttpRequest httpRequest) { - var resource = httpRequest.Path.ToUriComponent(); + var resource = httpRequest.Path.ToUriComponent(); - if (!resource.Equals(_path)) - { - return false; - } - - var method = httpRequest.Method; + if (!resource.Equals(_path)) + { + return false; + } - if (!method.Equals(HttpMethod.ToString())) - { - return false; - } + var method = httpRequest.Method; - return true; + if (!method.Equals(HttpMethod.ToString())) + { + return false; } + return true; + } + protected abstract Task HandleRequestAsync(HttpRequest request, HttpResponse response); protected virtual async Task ReadRequestDtoAsync(HttpRequest request) { - string requestMessage; + string requestMessage; - using (var reader = new StreamReader(request.Body, Encoding.UTF8)) - { - requestMessage = await reader.ReadToEndAsync().ConfigureAwait(false); - } + using (var reader = new StreamReader(request.Body, Encoding.UTF8)) + { + requestMessage = await reader.ReadToEndAsync().ConfigureAwait(false); + } - var requestDto = JsonConvert.DeserializeObject(requestMessage, JsonSerializerSettings); + var requestDto = JsonConvert.DeserializeObject(requestMessage, JsonSerializerSettings); - return requestDto; - } + return requestDto; + } protected virtual async Task WriteResponseAsync(HttpResponse response, T responseDto, int statusCode) { - var body = JsonConvert.SerializeObject(responseDto, JsonSerializerSettings); + var body = JsonConvert.SerializeObject(responseDto, JsonSerializerSettings); - response.ContentType = "application/json"; - response.StatusCode = statusCode; + response.ContentType = "application/json"; + response.StatusCode = statusCode; - await response.WriteAsync(body, Encoding.UTF8).ConfigureAwait(false); - } + await response.WriteAsync(body, Encoding.UTF8).ConfigureAwait(false); + } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.MongoDb/Adapters/HeaderAdapter.cs b/src/KafkaFlow.Retry.MongoDb/Adapters/HeaderAdapter.cs index b45b5825..8b1727ac 100644 --- a/src/KafkaFlow.Retry.MongoDb/Adapters/HeaderAdapter.cs +++ b/src/KafkaFlow.Retry.MongoDb/Adapters/HeaderAdapter.cs @@ -9,19 +9,19 @@ internal class HeaderAdapter : IHeaderAdapter { public RetryQueueHeaderDbo Adapt(MessageHeader header) { - Guard.Argument(header, nameof(header)).NotNull(); + Guard.Argument(header, nameof(header)).NotNull(); - return new RetryQueueHeaderDbo - { - Key = header.Key, - Value = header.Value - }; - } + return new RetryQueueHeaderDbo + { + Key = header.Key, + Value = header.Value + }; + } public MessageHeader Adapt(RetryQueueHeaderDbo headerDbo) { - Guard.Argument(headerDbo, nameof(headerDbo)).NotNull(); + Guard.Argument(headerDbo, nameof(headerDbo)).NotNull(); - return new MessageHeader(headerDbo.Key, headerDbo.Value); - } + return new MessageHeader(headerDbo.Key, headerDbo.Value); + } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.MongoDb/Adapters/ItemAdapter.cs b/src/KafkaFlow.Retry.MongoDb/Adapters/ItemAdapter.cs index 73ff05c9..b775296e 100644 --- a/src/KafkaFlow.Retry.MongoDb/Adapters/ItemAdapter.cs +++ b/src/KafkaFlow.Retry.MongoDb/Adapters/ItemAdapter.cs @@ -11,28 +11,28 @@ internal class ItemAdapter : IItemAdapter public ItemAdapter(IMessageAdapter messageAdater) { - Guard.Argument(messageAdater, nameof(messageAdater)).NotNull(); + Guard.Argument(messageAdater, nameof(messageAdater)).NotNull(); - _messageAdapter = messageAdater; - } + _messageAdapter = messageAdater; + } public RetryQueueItem Adapt(RetryQueueItemDbo itemDbo) { - Guard.Argument(itemDbo, nameof(itemDbo)).NotNull(); + Guard.Argument(itemDbo, nameof(itemDbo)).NotNull(); - return new RetryQueueItem( - itemDbo.Id, - itemDbo.AttemptsCount, - itemDbo.CreationDate, - itemDbo.Sort, - itemDbo.LastExecution, - itemDbo.ModifiedStatusDate, - itemDbo.Status, - itemDbo.SeverityLevel, - itemDbo.Description - ) - { - Message = _messageAdapter.Adapt(itemDbo.Message) - }; - } + return new RetryQueueItem( + itemDbo.Id, + itemDbo.AttemptsCount, + itemDbo.CreationDate, + itemDbo.Sort, + itemDbo.LastExecution, + itemDbo.ModifiedStatusDate, + itemDbo.Status, + itemDbo.SeverityLevel, + itemDbo.Description + ) + { + Message = _messageAdapter.Adapt(itemDbo.Message) + }; + } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.MongoDb/Adapters/MessageAdapter.cs b/src/KafkaFlow.Retry.MongoDb/Adapters/MessageAdapter.cs index ca35db32..c87d6d04 100644 --- a/src/KafkaFlow.Retry.MongoDb/Adapters/MessageAdapter.cs +++ b/src/KafkaFlow.Retry.MongoDb/Adapters/MessageAdapter.cs @@ -12,38 +12,38 @@ internal class MessageAdapter : IMessageAdapter public MessageAdapter(IHeaderAdapter headerAdapter) { - Guard.Argument(headerAdapter, nameof(headerAdapter)).NotNull(); + Guard.Argument(headerAdapter, nameof(headerAdapter)).NotNull(); - _headerAdapter = headerAdapter; - } + _headerAdapter = headerAdapter; + } public RetryQueueItemMessage Adapt(RetryQueueItemMessageDbo messageDbo) { - Guard.Argument(messageDbo, nameof(messageDbo)).NotNull(); - - return new RetryQueueItemMessage( - messageDbo.TopicName, - messageDbo.Key, - messageDbo.Value, - messageDbo.Partition, - messageDbo.Offset, - messageDbo.UtcTimeStamp, - messageDbo.Headers?.Select(headerDbo => _headerAdapter.Adapt(headerDbo))); - } + Guard.Argument(messageDbo, nameof(messageDbo)).NotNull(); + + return new RetryQueueItemMessage( + messageDbo.TopicName, + messageDbo.Key, + messageDbo.Value, + messageDbo.Partition, + messageDbo.Offset, + messageDbo.UtcTimeStamp, + messageDbo.Headers?.Select(headerDbo => _headerAdapter.Adapt(headerDbo))); + } public RetryQueueItemMessageDbo Adapt(RetryQueueItemMessage message) { - Guard.Argument(message, nameof(message)).NotNull(); - - return new RetryQueueItemMessageDbo - { - Key = message.Key, - Value = message.Value, - Offset = message.Offset, - Partition = message.Partition, - TopicName = message.TopicName, - UtcTimeStamp = message.UtcTimeStamp, - Headers = message.Headers.Select(h => _headerAdapter.Adapt(h)) - }; - } + Guard.Argument(message, nameof(message)).NotNull(); + + return new RetryQueueItemMessageDbo + { + Key = message.Key, + Value = message.Value, + Offset = message.Offset, + Partition = message.Partition, + TopicName = message.TopicName, + UtcTimeStamp = message.UtcTimeStamp, + Headers = message.Headers.Select(h => _headerAdapter.Adapt(h)) + }; + } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.MongoDb/Adapters/QueuesAdapter.cs b/src/KafkaFlow.Retry.MongoDb/Adapters/QueuesAdapter.cs index 68af1525..1fd27a90 100644 --- a/src/KafkaFlow.Retry.MongoDb/Adapters/QueuesAdapter.cs +++ b/src/KafkaFlow.Retry.MongoDb/Adapters/QueuesAdapter.cs @@ -14,41 +14,41 @@ internal class QueuesAdapter : IQueuesAdapter public QueuesAdapter(IItemAdapter itemAdapter) { - Guard.Argument(itemAdapter, nameof(itemAdapter)).NotNull(); + Guard.Argument(itemAdapter, nameof(itemAdapter)).NotNull(); - _itemAdapter = itemAdapter; - } + _itemAdapter = itemAdapter; + } public IEnumerable Adapt(IEnumerable queuesDbo, IEnumerable itemsDbo) { - var queuesDictionary = new Dictionary + var queuesDictionary = new Dictionary + ( + queuesDbo.ToDictionary ( - queuesDbo.ToDictionary - ( - queueDbo => queueDbo.Id, - queueDbo => Adapt(queueDbo) - ) - ); - - foreach (var itemDbo in itemsDbo) - { - Guard.Argument(queuesDictionary.ContainsKey(itemDbo.RetryQueueId), nameof(itemDbo.RetryQueueId)) - .True($"{nameof(itemDbo.RetryQueueId)} not found in queues list."); - - queuesDictionary[itemDbo.RetryQueueId].AddItem(_itemAdapter.Adapt(itemDbo)); - } - - return queuesDictionary.Values; + queueDbo => queueDbo.Id, + queueDbo => Adapt(queueDbo) + ) + ); + + foreach (var itemDbo in itemsDbo) + { + Guard.Argument(queuesDictionary.ContainsKey(itemDbo.RetryQueueId), nameof(itemDbo.RetryQueueId)) + .True($"{nameof(itemDbo.RetryQueueId)} not found in queues list."); + + queuesDictionary[itemDbo.RetryQueueId].AddItem(_itemAdapter.Adapt(itemDbo)); } + return queuesDictionary.Values; + } + private RetryQueue Adapt(RetryQueueDbo queueDbo) { - return new RetryQueue( - queueDbo.Id, - queueDbo.SearchGroupKey, - queueDbo.QueueGroupKey, - queueDbo.CreationDate, - queueDbo.LastExecution, - queueDbo.Status); - } + return new RetryQueue( + queueDbo.Id, + queueDbo.SearchGroupKey, + queueDbo.QueueGroupKey, + queueDbo.CreationDate, + queueDbo.LastExecution, + queueDbo.Status); + } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.MongoDb/DataProviderCreationException.cs b/src/KafkaFlow.Retry.MongoDb/DataProviderCreationException.cs index 57d06db2..96942b9e 100644 --- a/src/KafkaFlow.Retry.MongoDb/DataProviderCreationException.cs +++ b/src/KafkaFlow.Retry.MongoDb/DataProviderCreationException.cs @@ -10,10 +10,10 @@ public class DataProviderCreationException : Exception public DataProviderCreationException(string message, Exception innerException) : base(message, innerException) { - } + } public DataProviderCreationException(string message) : base(message) { - } + } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.MongoDb/DataProviderCreationResult.cs b/src/KafkaFlow.Retry.MongoDb/DataProviderCreationResult.cs index 7ce0ebfd..3c5044ac 100644 --- a/src/KafkaFlow.Retry.MongoDb/DataProviderCreationResult.cs +++ b/src/KafkaFlow.Retry.MongoDb/DataProviderCreationResult.cs @@ -6,10 +6,10 @@ public class DataProviderCreationResult { internal DataProviderCreationResult(string message, IRetryDurableQueueRepositoryProvider result, bool success) { - Message = message; - Result = result; - Success = success; - } + Message = message; + Result = result; + Success = success; + } public string Message { get; } public IRetryDurableQueueRepositoryProvider Result { get; } diff --git a/src/KafkaFlow.Retry.MongoDb/DbContext.cs b/src/KafkaFlow.Retry.MongoDb/DbContext.cs index 7aa3672e..4d732015 100644 --- a/src/KafkaFlow.Retry.MongoDb/DbContext.cs +++ b/src/KafkaFlow.Retry.MongoDb/DbContext.cs @@ -2,6 +2,7 @@ using MongoDB.Driver; namespace KafkaFlow.Retry.MongoDb; + internal sealed class DbContext { private readonly IMongoDatabase _database; @@ -9,14 +10,17 @@ internal sealed class DbContext public DbContext(MongoDbSettings mongoDbSettings, IMongoClient mongoClient) { - _mongoDbSettings = mongoDbSettings; - MongoClient = mongoClient; + _mongoDbSettings = mongoDbSettings; + MongoClient = mongoClient; - _database = mongoClient.GetDatabase(_mongoDbSettings.DatabaseName); - } + _database = mongoClient.GetDatabase(_mongoDbSettings.DatabaseName); + } public IMongoClient MongoClient { get; } - public IMongoCollection RetryQueueItems => _database.GetCollection(_mongoDbSettings.RetryQueueItemCollectionName); - public IMongoCollection RetryQueues => _database.GetCollection(_mongoDbSettings.RetryQueueCollectionName); + public IMongoCollection RetryQueueItems => + _database.GetCollection(_mongoDbSettings.RetryQueueItemCollectionName); + + public IMongoCollection RetryQueues => + _database.GetCollection(_mongoDbSettings.RetryQueueCollectionName); } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.MongoDb/Model/DboConfigurations.cs b/src/KafkaFlow.Retry.MongoDb/Model/DboConfigurations.cs index ac6e1c5a..c14b9d59 100644 --- a/src/KafkaFlow.Retry.MongoDb/Model/DboConfigurations.cs +++ b/src/KafkaFlow.Retry.MongoDb/Model/DboConfigurations.cs @@ -10,59 +10,59 @@ internal static class DboConfigurations { internal static void TryAddIndexes(DbContext dbContext) { - dbContext.RetryQueues.Indexes.CreateMany( - new CreateIndexModel[] - { - new CreateIndexModel( - Builders.IndexKeys.Ascending(x => x.SearchGroupKey) - ), - new CreateIndexModel( - Builders.IndexKeys.Ascending(x => x.QueueGroupKey), - new CreateIndexOptions { Unique = true } - ), - new CreateIndexModel( - Builders.IndexKeys.Ascending(x => x.Status) - ), - new CreateIndexModel( - Builders.IndexKeys.Descending(x => x.CreationDate) - ), - new CreateIndexModel( - Builders.IndexKeys.Ascending(x => x.LastExecution) - ) - } - ); + dbContext.RetryQueues.Indexes.CreateMany( + new CreateIndexModel[] + { + new( + Builders.IndexKeys.Ascending(x => x.SearchGroupKey) + ), + new( + Builders.IndexKeys.Ascending(x => x.QueueGroupKey), + new CreateIndexOptions { Unique = true } + ), + new( + Builders.IndexKeys.Ascending(x => x.Status) + ), + new( + Builders.IndexKeys.Descending(x => x.CreationDate) + ), + new( + Builders.IndexKeys.Ascending(x => x.LastExecution) + ) + } + ); - dbContext.RetryQueueItems.Indexes.CreateMany( - new CreateIndexModel[] - { - new CreateIndexModel( - Builders.IndexKeys.Ascending(x => x.RetryQueueId) - ), - new CreateIndexModel( - Builders.IndexKeys.Ascending(x => x.Status) - ), - new CreateIndexModel( - Builders.IndexKeys.Descending(x => x.SeverityLevel) - ), - new CreateIndexModel( - Builders.IndexKeys.Ascending(x => x.Sort) - ) - } - ); - } + dbContext.RetryQueueItems.Indexes.CreateMany( + new CreateIndexModel[] + { + new( + Builders.IndexKeys.Ascending(x => x.RetryQueueId) + ), + new( + Builders.IndexKeys.Ascending(x => x.Status) + ), + new( + Builders.IndexKeys.Descending(x => x.SeverityLevel) + ), + new( + Builders.IndexKeys.Ascending(x => x.Sort) + ) + } + ); + } internal static void TryRegisterClassMapppings() { - BsonClassMap.TryRegisterClassMap(cm => - { - cm.AutoMap(); - cm.MapIdProperty(q => q.Id).SetIdGenerator(new GuidGenerator()); - }); + BsonClassMap.TryRegisterClassMap(cm => + { + cm.AutoMap(); + cm.MapIdProperty(q => q.Id).SetIdGenerator(new GuidGenerator()); + }); - BsonClassMap.TryRegisterClassMap(cm => - { - cm.AutoMap(); - cm.MapIdProperty(q => q.Id).SetIdGenerator(new GuidGenerator()); - }); - } + BsonClassMap.TryRegisterClassMap(cm => + { + cm.AutoMap(); + cm.MapIdProperty(q => q.Id).SetIdGenerator(new GuidGenerator()); + }); + } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.MongoDb/Model/Factories/RetryQueueDboFactory.cs b/src/KafkaFlow.Retry.MongoDb/Model/Factories/RetryQueueDboFactory.cs index 643294f9..56389e6c 100644 --- a/src/KafkaFlow.Retry.MongoDb/Model/Factories/RetryQueueDboFactory.cs +++ b/src/KafkaFlow.Retry.MongoDb/Model/Factories/RetryQueueDboFactory.cs @@ -7,15 +7,15 @@ internal static class RetryQueueDboFactory { internal static RetryQueueDbo Create(SaveToQueueInput input) { - Guard.Argument(input).NotNull(); + Guard.Argument(input).NotNull(); - return new RetryQueueDbo - { - SearchGroupKey = input.SearchGroupKey, - QueueGroupKey = input.QueueGroupKey, - CreationDate = input.CreationDate, - LastExecution = input.LastExecution.Value, - Status = input.QueueStatus - }; - } + return new RetryQueueDbo + { + SearchGroupKey = input.SearchGroupKey, + QueueGroupKey = input.QueueGroupKey, + CreationDate = input.CreationDate, + LastExecution = input.LastExecution.Value, + Status = input.QueueStatus + }; + } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.MongoDb/Model/Factories/RetryQueueItemDboFactory.cs b/src/KafkaFlow.Retry.MongoDb/Model/Factories/RetryQueueItemDboFactory.cs index 46d42d24..e840edf7 100644 --- a/src/KafkaFlow.Retry.MongoDb/Model/Factories/RetryQueueItemDboFactory.cs +++ b/src/KafkaFlow.Retry.MongoDb/Model/Factories/RetryQueueItemDboFactory.cs @@ -11,27 +11,27 @@ internal class RetryQueueItemDboFactory public RetryQueueItemDboFactory(IMessageAdapter messageAdapter) { - _messageAdapter = messageAdapter; - } + _messageAdapter = messageAdapter; + } public RetryQueueItemDbo Create(SaveToQueueInput input, Guid queueId, int sort = 0) { - Guard.Argument(input, nameof(input)).NotNull(); - Guard.Argument(queueId).NotDefault(); - Guard.Argument(sort, nameof(sort)).NotNegative(); + Guard.Argument(input, nameof(input)).NotNull(); + Guard.Argument(queueId).NotDefault(); + Guard.Argument(sort, nameof(sort)).NotNegative(); - return new RetryQueueItemDbo - { - CreationDate = input.CreationDate, - LastExecution = input.LastExecution, - ModifiedStatusDate = input.ModifiedStatusDate, - AttemptsCount = input.AttemptsCount, - Message = _messageAdapter.Adapt(input.Message), - RetryQueueId = queueId, - Sort = sort, - Status = input.ItemStatus, - SeverityLevel = input.SeverityLevel, - Description = input.Description - }; - } + return new RetryQueueItemDbo + { + CreationDate = input.CreationDate, + LastExecution = input.LastExecution, + ModifiedStatusDate = input.ModifiedStatusDate, + AttemptsCount = input.AttemptsCount, + Message = _messageAdapter.Adapt(input.Message), + RetryQueueId = queueId, + Sort = sort, + Status = input.ItemStatus, + SeverityLevel = input.SeverityLevel, + Description = input.Description + }; + } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.MongoDb/MongoDbDataProviderFactory.cs b/src/KafkaFlow.Retry.MongoDb/MongoDbDataProviderFactory.cs index 24f86798..de05457b 100644 --- a/src/KafkaFlow.Retry.MongoDb/MongoDbDataProviderFactory.cs +++ b/src/KafkaFlow.Retry.MongoDb/MongoDbDataProviderFactory.cs @@ -17,7 +17,8 @@ public MongoDbDataProviderFactory() public DataProviderCreationResult TryCreate(MongoDbSettings mongoDbSettings) { Guard.Argument(mongoDbSettings) - .NotNull($"It is mandatory to configure the factory before creating new instances of {nameof(IRetryDurableQueueRepositoryProvider)}. Make sure the Config method is executed before the Create method."); + .NotNull( + $"It is mandatory to configure the factory before creating new instances of {nameof(IRetryDurableQueueRepositoryProvider)}. Make sure the Config method is executed before the Create method."); try { var mongoClient = new MongoClient(mongoDbSettings.ConnectionString); diff --git a/src/KafkaFlow.Retry.MongoDb/MongoRepositoryCollectionExtensions.cs b/src/KafkaFlow.Retry.MongoDb/MongoRepositoryCollectionExtensions.cs index 5c6c1d29..ebfdfdd1 100644 --- a/src/KafkaFlow.Retry.MongoDb/MongoRepositoryCollectionExtensions.cs +++ b/src/KafkaFlow.Retry.MongoDb/MongoRepositoryCollectionExtensions.cs @@ -8,37 +8,43 @@ namespace KafkaFlow.Retry.MongoDb; [ExcludeFromCodeCoverage] internal static class MongoRepositoryCollectionExtensions { - public static async Task> GetAsync(this IMongoCollection collection, FilterDefinition filter, FindOptions options = null) + public static async Task> GetAsync( + this IMongoCollection collection, FilterDefinition filter, + FindOptions options = null) { - var data = new List(); + var data = new List(); - var cursor = await collection.FindAsync(filter, options).ConfigureAwait(false); + var cursor = await collection.FindAsync(filter, options).ConfigureAwait(false); - while (await cursor.MoveNextAsync().ConfigureAwait(false)) - { - data.AddRange(cursor.Current); - } - - return data; + while (await cursor.MoveNextAsync().ConfigureAwait(false)) + { + data.AddRange(cursor.Current); } - public static FilterDefinitionBuilder GetFilters(this IMongoCollection collection) + return data; + } + + public static FilterDefinitionBuilder GetFilters( + this IMongoCollection collection) { - return Builders.Filter; - } + return Builders.Filter; + } - public static async Task GetOneAsync(this IMongoCollection collection, FilterDefinition filter) + public static async Task GetOneAsync(this IMongoCollection collection, + FilterDefinition filter) { - return await collection.Find(filter).FirstOrDefaultAsync().ConfigureAwait(false); - } + return await collection.Find(filter).FirstOrDefaultAsync().ConfigureAwait(false); + } - public static SortDefinitionBuilder GetSortDefinition(this IMongoCollection collection) + public static SortDefinitionBuilder GetSortDefinition( + this IMongoCollection collection) { - return Builders.Sort; - } + return Builders.Sort; + } - public static UpdateDefinitionBuilder GetUpdateDefinition(this IMongoCollection collection) + public static UpdateDefinitionBuilder GetUpdateDefinition( + this IMongoCollection collection) { - return Builders.Update; - } + return Builders.Update; + } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.MongoDb/Repositories/RetryQueueItemRepository.cs b/src/KafkaFlow.Retry.MongoDb/Repositories/RetryQueueItemRepository.cs index e401ed42..97479916 100644 --- a/src/KafkaFlow.Retry.MongoDb/Repositories/RetryQueueItemRepository.cs +++ b/src/KafkaFlow.Retry.MongoDb/Repositories/RetryQueueItemRepository.cs @@ -18,42 +18,43 @@ internal class RetryQueueItemRepository : IRetryQueueItemRepository public RetryQueueItemRepository(DbContext dbContext) { - Guard.Argument(dbContext).NotNull(); + Guard.Argument(dbContext).NotNull(); - _dbContext = dbContext; - } + _dbContext = dbContext; + } public async Task AnyItemStillActiveAsync(Guid retryQueueId) { - var itemsFilterBuilder = _dbContext.RetryQueueItems.GetFilters(); + var itemsFilterBuilder = _dbContext.RetryQueueItems.GetFilters(); - var itemsFilter = itemsFilterBuilder.Eq(i => i.RetryQueueId, retryQueueId) - & itemsFilterBuilder.Nin(i => i.Status, new RetryQueueItemStatus[] { RetryQueueItemStatus.Done, RetryQueueItemStatus.Cancelled }); + var itemsFilter = itemsFilterBuilder.Eq(i => i.RetryQueueId, retryQueueId) + & itemsFilterBuilder.Nin(i => i.Status, + new[] { RetryQueueItemStatus.Done, RetryQueueItemStatus.Cancelled }); - var itemsDbo = await _dbContext.RetryQueueItems.GetAsync(itemsFilter).ConfigureAwait(false); + var itemsDbo = await _dbContext.RetryQueueItems.GetAsync(itemsFilter).ConfigureAwait(false); - return itemsDbo.Any(); - } + return itemsDbo.Any(); + } public async Task DeleteItemsAsync(IEnumerable queueIds) { - var itemsFilterBuilder = _dbContext.RetryQueueItems.GetFilters(); + var itemsFilterBuilder = _dbContext.RetryQueueItems.GetFilters(); - var deleteFilter = itemsFilterBuilder.In(i => i.RetryQueueId, queueIds); + var deleteFilter = itemsFilterBuilder.In(i => i.RetryQueueId, queueIds); - await _dbContext.RetryQueueItems.DeleteManyAsync(deleteFilter).ConfigureAwait(false); - } + await _dbContext.RetryQueueItems.DeleteManyAsync(deleteFilter).ConfigureAwait(false); + } public async Task GetItemAsync(Guid itemId) { - var queueItemsFilterBuilder = _dbContext.RetryQueueItems.GetFilters(); + var queueItemsFilterBuilder = _dbContext.RetryQueueItems.GetFilters(); - var queueItemsFilter = queueItemsFilterBuilder.Eq(q => q.Id, itemId); + var queueItemsFilter = queueItemsFilterBuilder.Eq(q => q.Id, itemId); - var items = await _dbContext.RetryQueueItems.GetAsync(queueItemsFilter).ConfigureAwait(false); + var items = await _dbContext.RetryQueueItems.GetAsync(queueItemsFilter).ConfigureAwait(false); - return items.FirstOrDefault(); - } + return items.FirstOrDefault(); + } public async Task> GetItemsAsync( IEnumerable queueIds, @@ -62,75 +63,76 @@ public async Task> GetItemsAsync( int? top = null, StuckStatusFilter stuckStatusFilter = null) { - var itemsFilterBuilder = _dbContext.RetryQueueItems.GetFilters(); - - var itemsFilter = itemsFilterBuilder.In(i => i.RetryQueueId, queueIds); - - if (stuckStatusFilter is null) - { - itemsFilter &= itemsFilterBuilder.In(i => i.Status, statuses); - } - else - { - itemsFilter &= itemsFilterBuilder.Or( - itemsFilterBuilder.In(i => i.Status, statuses), - itemsFilterBuilder.Eq(i => i.Status, stuckStatusFilter.ItemStatus) - & itemsFilterBuilder.Lt(i => i.ModifiedStatusDate, DateTime.UtcNow.AddSeconds(-stuckStatusFilter.ExpirationInterval.TotalSeconds))); - } - - if (severities is object && severities.Any()) - { - itemsFilter &= itemsFilterBuilder.In(i => i.SeverityLevel, severities); - } - - var options = new FindOptions - { - Sort = _dbContext.RetryQueueItems.GetSortDefinition().Ascending(i => i.Sort), - Limit = top - }; - - return await _dbContext.RetryQueueItems.GetAsync(itemsFilter, options).ConfigureAwait(false); + var itemsFilterBuilder = _dbContext.RetryQueueItems.GetFilters(); + + var itemsFilter = itemsFilterBuilder.In(i => i.RetryQueueId, queueIds); + + if (stuckStatusFilter is null) + { + itemsFilter &= itemsFilterBuilder.In(i => i.Status, statuses); + } + else + { + itemsFilter &= itemsFilterBuilder.Or( + itemsFilterBuilder.In(i => i.Status, statuses), + itemsFilterBuilder.Eq(i => i.Status, stuckStatusFilter.ItemStatus) + & itemsFilterBuilder.Lt(i => i.ModifiedStatusDate, + DateTime.UtcNow.AddSeconds(-stuckStatusFilter.ExpirationInterval.TotalSeconds))); } - public async Task IsFirstWaitingInQueue(RetryQueueItemDbo item) - { - var sortedItems = await GetItemsAsync( - new Guid[] { item.RetryQueueId }, - new RetryQueueItemStatus[] { RetryQueueItemStatus.Waiting }) - .ConfigureAwait(false); + if (severities is object && severities.Any()) + { + itemsFilter &= itemsFilterBuilder.In(i => i.SeverityLevel, severities); + } - if (sortedItems.Any() && item.Id == sortedItems.First().Id) - { - return true; - } + var options = new FindOptions + { + Sort = _dbContext.RetryQueueItems.GetSortDefinition().Ascending(i => i.Sort), + Limit = top + }; - return false; + return await _dbContext.RetryQueueItems.GetAsync(itemsFilter, options).ConfigureAwait(false); + } + + public async Task IsFirstWaitingInQueue(RetryQueueItemDbo item) + { + var sortedItems = await GetItemsAsync( + new[] { item.RetryQueueId }, + new[] { RetryQueueItemStatus.Waiting }) + .ConfigureAwait(false); + + if (sortedItems.Any() && item.Id == sortedItems.First().Id) + { + return true; } + return false; + } + public async Task UpdateItemAsync( Guid itemId, RetryQueueItemStatus status, int attemptsCount, DateTime? lastExecution, string description) { - var filter = _dbContext.RetryQueueItems.GetFilters().Eq(i => i.Id, itemId); - - var update = _dbContext.RetryQueueItems.GetUpdateDefinition() - .Set(i => i.Status, status) - .Set(i => i.AttemptsCount, attemptsCount) - .Set(i => i.LastExecution, lastExecution) - .Set(i => i.ModifiedStatusDate, DateTime.UtcNow); - - if (!string.IsNullOrEmpty(description)) - { - update = update - .Set(i => i.Description, description); - } - - var updateResult = await _dbContext.RetryQueueItems.UpdateOneAsync(filter, update).ConfigureAwait(false); + var filter = _dbContext.RetryQueueItems.GetFilters().Eq(i => i.Id, itemId); + + var update = _dbContext.RetryQueueItems.GetUpdateDefinition() + .Set(i => i.Status, status) + .Set(i => i.AttemptsCount, attemptsCount) + .Set(i => i.LastExecution, lastExecution) + .Set(i => i.ModifiedStatusDate, DateTime.UtcNow); + + if (!string.IsNullOrEmpty(description)) + { + update = update + .Set(i => i.Description, description); + } - if (updateResult.IsAcknowledged && updateResult.MatchedCount == 0) - { - return new UpdateItemResult(itemId, UpdateItemResultStatus.ItemNotFound); - } + var updateResult = await _dbContext.RetryQueueItems.UpdateOneAsync(filter, update).ConfigureAwait(false); - return new UpdateItemResult(itemId, UpdateItemResultStatus.Updated); + if (updateResult.IsAcknowledged && updateResult.MatchedCount == 0) + { + return new UpdateItemResult(itemId, UpdateItemResultStatus.ItemNotFound); } + + return new UpdateItemResult(itemId, UpdateItemResultStatus.Updated); + } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.MongoDb/Repositories/RetryQueueRepository.cs b/src/KafkaFlow.Retry.MongoDb/Repositories/RetryQueueRepository.cs index efcff985..67b362ce 100644 --- a/src/KafkaFlow.Retry.MongoDb/Repositories/RetryQueueRepository.cs +++ b/src/KafkaFlow.Retry.MongoDb/Repositories/RetryQueueRepository.cs @@ -17,116 +17,119 @@ internal class RetryQueueRepository : IRetryQueueRepository public RetryQueueRepository(DbContext dbContext) { - Guard.Argument(dbContext).NotNull(); + Guard.Argument(dbContext).NotNull(); - _dbContext = dbContext; - } + _dbContext = dbContext; + } public async Task DeleteQueuesAsync(IEnumerable queueIds) { - var queuesFilterBuilder = _dbContext.RetryQueues.GetFilters(); + var queuesFilterBuilder = _dbContext.RetryQueues.GetFilters(); - var deleteFilter = queuesFilterBuilder.In(q => q.Id, queueIds); + var deleteFilter = queuesFilterBuilder.In(q => q.Id, queueIds); - var deleteResult = await _dbContext.RetryQueues.DeleteManyAsync(deleteFilter).ConfigureAwait(false); + var deleteResult = await _dbContext.RetryQueues.DeleteManyAsync(deleteFilter).ConfigureAwait(false); - return new DeleteQueuesResult(GetDeletedCount(deleteResult)); - } + return new DeleteQueuesResult(GetDeletedCount(deleteResult)); + } public async Task GetQueueAsync(string queueGroupKey) { - var queuesFilterBuilder = _dbContext.RetryQueues.GetFilters(); + var queuesFilterBuilder = _dbContext.RetryQueues.GetFilters(); - var queuesFilter = queuesFilterBuilder.Eq(q => q.QueueGroupKey, queueGroupKey); + var queuesFilter = queuesFilterBuilder.Eq(q => q.QueueGroupKey, queueGroupKey); - return await _dbContext.RetryQueues.GetOneAsync(queuesFilter).ConfigureAwait(false); - } + return await _dbContext.RetryQueues.GetOneAsync(queuesFilter).ConfigureAwait(false); + } - public async Task> GetQueuesToDeleteAsync(string searchGroupKey, RetryQueueStatus status, DateTime maxLastExecutionDateToBeKept, int maxRowsToDelete) + public async Task> GetQueuesToDeleteAsync(string searchGroupKey, RetryQueueStatus status, + DateTime maxLastExecutionDateToBeKept, int maxRowsToDelete) { - var queuesFilterBuilder = _dbContext.RetryQueues.GetFilters(); + var queuesFilterBuilder = _dbContext.RetryQueues.GetFilters(); - var findFilter = queuesFilterBuilder.Eq(q => q.SearchGroupKey, searchGroupKey) - & queuesFilterBuilder.Eq(q => q.Status, status) - & queuesFilterBuilder.Lt(q => q.LastExecution, maxLastExecutionDateToBeKept); + var findFilter = queuesFilterBuilder.Eq(q => q.SearchGroupKey, searchGroupKey) + & queuesFilterBuilder.Eq(q => q.Status, status) + & queuesFilterBuilder.Lt(q => q.LastExecution, maxLastExecutionDateToBeKept); - var options = new FindOptions - { - Limit = maxRowsToDelete - }; + var options = new FindOptions + { + Limit = maxRowsToDelete + }; - var queuesToDelete = await _dbContext.RetryQueues.GetAsync(findFilter, options).ConfigureAwait(false); + var queuesToDelete = await _dbContext.RetryQueues.GetAsync(findFilter, options).ConfigureAwait(false); - return queuesToDelete.Select(q => q.Id); - } + return queuesToDelete.Select(q => q.Id); + } - public async Task> GetTopSortedQueuesAsync(RetryQueueStatus status, GetQueuesSortOption sortOption, string searchGroupKey, int top) + public async Task> GetTopSortedQueuesAsync(RetryQueueStatus status, + GetQueuesSortOption sortOption, string searchGroupKey, int top) { - var queuesFilterBuilder = _dbContext.RetryQueues.GetFilters(); + var queuesFilterBuilder = _dbContext.RetryQueues.GetFilters(); - var queuesFilter = queuesFilterBuilder.Eq(q => q.Status, status); + var queuesFilter = queuesFilterBuilder.Eq(q => q.Status, status); - if (searchGroupKey is object) - { - queuesFilter &= queuesFilterBuilder.Eq(q => q.SearchGroupKey, searchGroupKey); - } + if (searchGroupKey is object) + { + queuesFilter &= queuesFilterBuilder.Eq(q => q.SearchGroupKey, searchGroupKey); + } - SortDefinition sortDefinition; + SortDefinition sortDefinition; - switch (sortOption) - { - case GetQueuesSortOption.ByLastExecutionAscending: - sortDefinition = _dbContext.RetryQueues.GetSortDefinition().Ascending(i => i.LastExecution); - break; + switch (sortOption) + { + case GetQueuesSortOption.ByLastExecutionAscending: + sortDefinition = _dbContext.RetryQueues.GetSortDefinition().Ascending(i => i.LastExecution); + break; - case GetQueuesSortOption.ByCreationDateDescending: - default: - sortDefinition = _dbContext.RetryQueues.GetSortDefinition().Descending(i => i.CreationDate); - break; - } + case GetQueuesSortOption.ByCreationDateDescending: + default: + sortDefinition = _dbContext.RetryQueues.GetSortDefinition().Descending(i => i.CreationDate); + break; + } - var options = new FindOptions - { - Sort = sortDefinition, - Limit = top - }; + var options = new FindOptions + { + Sort = sortDefinition, + Limit = top + }; - return await _dbContext.RetryQueues.GetAsync(queuesFilter, options).ConfigureAwait(false); - } + return await _dbContext.RetryQueues.GetAsync(queuesFilter, options).ConfigureAwait(false); + } public async Task UpdateLastExecutionAsync(Guid queueId, DateTime lastExecution) { - var filter = _dbContext.RetryQueues.GetFilters().Eq(q => q.Id, queueId); + var filter = _dbContext.RetryQueues.GetFilters().Eq(q => q.Id, queueId); - var update = _dbContext.RetryQueues.GetUpdateDefinition() - .Set(q => q.LastExecution, lastExecution); + var update = _dbContext.RetryQueues.GetUpdateDefinition() + .Set(q => q.LastExecution, lastExecution); - return await _dbContext.RetryQueues.UpdateOneAsync(filter, update).ConfigureAwait(false); - } + return await _dbContext.RetryQueues.UpdateOneAsync(filter, update).ConfigureAwait(false); + } - public async Task UpdateStatusAndLastExecutionAsync(Guid queueId, RetryQueueStatus status, DateTime lastExecution) + public async Task UpdateStatusAndLastExecutionAsync(Guid queueId, RetryQueueStatus status, + DateTime lastExecution) { - var filter = _dbContext.RetryQueues.GetFilters().Eq(q => q.Id, queueId); + var filter = _dbContext.RetryQueues.GetFilters().Eq(q => q.Id, queueId); - var update = _dbContext.RetryQueues.GetUpdateDefinition() - .Set(q => q.Status, status) - .Set(q => q.LastExecution, lastExecution); + var update = _dbContext.RetryQueues.GetUpdateDefinition() + .Set(q => q.Status, status) + .Set(q => q.LastExecution, lastExecution); - return await _dbContext.RetryQueues.UpdateOneAsync(filter, update).ConfigureAwait(false); - } + return await _dbContext.RetryQueues.UpdateOneAsync(filter, update).ConfigureAwait(false); + } public async Task UpdateStatusAsync(Guid queueId, RetryQueueStatus status) { - var filter = _dbContext.RetryQueues.GetFilters().Eq(q => q.Id, queueId); + var filter = _dbContext.RetryQueues.GetFilters().Eq(q => q.Id, queueId); - var update = _dbContext.RetryQueues.GetUpdateDefinition() - .Set(q => q.Status, status); + var update = _dbContext.RetryQueues.GetUpdateDefinition() + .Set(q => q.Status, status); - return await _dbContext.RetryQueues.UpdateOneAsync(filter, update).ConfigureAwait(false); - } + return await _dbContext.RetryQueues.UpdateOneAsync(filter, update).ConfigureAwait(false); + } private int GetDeletedCount(DeleteResult deleteResult) { - return deleteResult.IsAcknowledged ? Convert.ToInt32(deleteResult.DeletedCount) : 0; - } + return deleteResult.IsAcknowledged ? Convert.ToInt32(deleteResult.DeletedCount) : 0; + } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.MongoDb/RetryDurableDefinitionBuilderExtension.cs b/src/KafkaFlow.Retry.MongoDb/RetryDurableDefinitionBuilderExtension.cs index 886f086c..b7928d35 100644 --- a/src/KafkaFlow.Retry.MongoDb/RetryDurableDefinitionBuilderExtension.cs +++ b/src/KafkaFlow.Retry.MongoDb/RetryDurableDefinitionBuilderExtension.cs @@ -9,24 +9,25 @@ public static RetryDurableDefinitionBuilder WithMongoDbDataProvider( string mongoDbretryQueueCollectionName, string mongoDbretryQueueItemCollectionName) { - var dataProviderCreation = new MongoDbDataProviderFactory() - .TryCreate( - new MongoDbSettings - { - ConnectionString = connectionString, - DatabaseName = databaseName, - RetryQueueCollectionName = mongoDbretryQueueCollectionName, - RetryQueueItemCollectionName = mongoDbretryQueueItemCollectionName - } - ); + var dataProviderCreation = new MongoDbDataProviderFactory() + .TryCreate( + new MongoDbSettings + { + ConnectionString = connectionString, + DatabaseName = databaseName, + RetryQueueCollectionName = mongoDbretryQueueCollectionName, + RetryQueueItemCollectionName = mongoDbretryQueueItemCollectionName + } + ); - if (!dataProviderCreation.Success) - { - throw new DataProviderCreationException($"The Retry Queue Data Provider could not be created. Error: {dataProviderCreation.Message}"); - } + if (!dataProviderCreation.Success) + { + throw new DataProviderCreationException( + $"The Retry Queue Data Provider could not be created. Error: {dataProviderCreation.Message}"); + } - retryDurableDefinitionBuilder.WithRepositoryProvider(dataProviderCreation.Result); + retryDurableDefinitionBuilder.WithRepositoryProvider(dataProviderCreation.Result); - return retryDurableDefinitionBuilder; - } + return retryDurableDefinitionBuilder; + } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.MongoDb/RetryQueueDataProvider.cs b/src/KafkaFlow.Retry.MongoDb/RetryQueueDataProvider.cs index 1f106d3c..f5400fb7 100644 --- a/src/KafkaFlow.Retry.MongoDb/RetryQueueDataProvider.cs +++ b/src/KafkaFlow.Retry.MongoDb/RetryQueueDataProvider.cs @@ -32,404 +32,422 @@ internal RetryQueueDataProvider( IRetryQueueRepository retryQueueRepository, IRetryQueueItemRepository retryQueueItemRepository) { - Guard.Argument(dbContext).NotNull(); + Guard.Argument(dbContext).NotNull(); - _dbContext = dbContext; - _retryQueueRepository = retryQueueRepository; - _retryQueueItemRepository = retryQueueItemRepository; - var messageAdapter = new MessageAdapter(new HeaderAdapter()); + _dbContext = dbContext; + _retryQueueRepository = retryQueueRepository; + _retryQueueItemRepository = retryQueueItemRepository; + var messageAdapter = new MessageAdapter(new HeaderAdapter()); - _retryQueueItemDboFactory = new RetryQueueItemDboFactory(messageAdapter); - _queuesAdapter = new QueuesAdapter(new ItemAdapter(messageAdapter)); - } + _retryQueueItemDboFactory = new RetryQueueItemDboFactory(messageAdapter); + _queuesAdapter = new QueuesAdapter(new ItemAdapter(messageAdapter)); + } public async Task CheckQueueAsync(CheckQueueInput input) { - Guard.Argument(input).NotNull(); - - // Tries to find an active queue for the GroupKey - var retryQueueDbo = await _dbContext.RetryQueues - .AsQueryable() - .FirstOrDefaultAsync(q => - q.QueueGroupKey == input.QueueGroupKey && - q.Status != RetryQueueStatus.Done - ).ConfigureAwait(false); - - if (retryQueueDbo != null) - { - return new CheckQueueResult(CheckQueueResultStatus.Exists); - } - - return new CheckQueueResult(CheckQueueResultStatus.DoesNotExist); + Guard.Argument(input).NotNull(); + + // Tries to find an active queue for the GroupKey + var retryQueueDbo = await _dbContext.RetryQueues + .AsQueryable() + .FirstOrDefaultAsync(q => + q.QueueGroupKey == input.QueueGroupKey && + q.Status != RetryQueueStatus.Done + ).ConfigureAwait(false); + + if (retryQueueDbo != null) + { + return new CheckQueueResult(CheckQueueResultStatus.Exists); } + return new CheckQueueResult(CheckQueueResultStatus.DoesNotExist); + } + public async Task CheckQueueNewestItemsAsync(QueueNewestItemsInput input) { - Guard.Argument(input, nameof(input)).NotNull(); + Guard.Argument(input, nameof(input)).NotNull(); - var itemsFilterBuilder = _dbContext.RetryQueueItems.GetFilters(); + var itemsFilterBuilder = _dbContext.RetryQueueItems.GetFilters(); - var itemsFilter = itemsFilterBuilder.Eq(i => i.RetryQueueId, input.QueueId) - & itemsFilterBuilder.In(i => i.Status, new RetryQueueItemStatus[] { RetryQueueItemStatus.Waiting, RetryQueueItemStatus.InRetry }) - & itemsFilterBuilder.Gt(i => i.Sort, input.Sort); + var itemsFilter = itemsFilterBuilder.Eq(i => i.RetryQueueId, input.QueueId) + & itemsFilterBuilder.In(i => i.Status, + new[] { RetryQueueItemStatus.Waiting, RetryQueueItemStatus.InRetry }) + & itemsFilterBuilder.Gt(i => i.Sort, input.Sort); - var itemsDbo = await _dbContext.RetryQueueItems.GetAsync(itemsFilter).ConfigureAwait(false); + var itemsDbo = await _dbContext.RetryQueueItems.GetAsync(itemsFilter).ConfigureAwait(false); - if (itemsDbo.Any()) - { - return new QueueNewestItemsResult(QueueNewestItemsResultStatus.HasNewestItems); - } - - return new QueueNewestItemsResult(QueueNewestItemsResultStatus.NoNewestItems); + if (itemsDbo.Any()) + { + return new QueueNewestItemsResult(QueueNewestItemsResultStatus.HasNewestItems); } + return new QueueNewestItemsResult(QueueNewestItemsResultStatus.NoNewestItems); + } + public async Task CheckQueuePendingItemsAsync(QueuePendingItemsInput input) { - Guard.Argument(input, nameof(input)).NotNull(); + Guard.Argument(input, nameof(input)).NotNull(); - var itemsFilterBuilder = _dbContext.RetryQueueItems.GetFilters(); + var itemsFilterBuilder = _dbContext.RetryQueueItems.GetFilters(); - var itemsFilter = itemsFilterBuilder.Eq(i => i.RetryQueueId, input.QueueId) - & itemsFilterBuilder.In(i => i.Status, new RetryQueueItemStatus[] { RetryQueueItemStatus.Waiting, RetryQueueItemStatus.InRetry }) - & itemsFilterBuilder.Lt(i => i.Sort, input.Sort); + var itemsFilter = itemsFilterBuilder.Eq(i => i.RetryQueueId, input.QueueId) + & itemsFilterBuilder.In(i => i.Status, + new[] { RetryQueueItemStatus.Waiting, RetryQueueItemStatus.InRetry }) + & itemsFilterBuilder.Lt(i => i.Sort, input.Sort); - var itemsDbo = await _dbContext.RetryQueueItems.GetAsync(itemsFilter).ConfigureAwait(false); + var itemsDbo = await _dbContext.RetryQueueItems.GetAsync(itemsFilter).ConfigureAwait(false); - if (itemsDbo.Any()) - { - return new QueuePendingItemsResult(QueuePendingItemsResultStatus.HasPendingItems); - } - - return new QueuePendingItemsResult(QueuePendingItemsResultStatus.NoPendingItems); + if (itemsDbo.Any()) + { + return new QueuePendingItemsResult(QueuePendingItemsResultStatus.HasPendingItems); } + return new QueuePendingItemsResult(QueuePendingItemsResultStatus.NoPendingItems); + } + public async Task DeleteQueuesAsync(DeleteQueuesInput input) { - Guard.Argument(input, nameof(input)).NotNull(); - - var queueIdsToDelete = await _retryQueueRepository - .GetQueuesToDeleteAsync( - input.SearchGroupKey, - input.RetryQueueStatus, - input.MaxLastExecutionDateToBeKept, - input.MaxRowsToDelete) - .ConfigureAwait(false); - - await _retryQueueItemRepository - .DeleteItemsAsync(queueIdsToDelete) - .ConfigureAwait(false); - - return await _retryQueueRepository - .DeleteQueuesAsync(queueIdsToDelete) - .ConfigureAwait(false); - } + Guard.Argument(input, nameof(input)).NotNull(); + + var queueIdsToDelete = await _retryQueueRepository + .GetQueuesToDeleteAsync( + input.SearchGroupKey, + input.RetryQueueStatus, + input.MaxLastExecutionDateToBeKept, + input.MaxRowsToDelete) + .ConfigureAwait(false); + + await _retryQueueItemRepository + .DeleteItemsAsync(queueIdsToDelete) + .ConfigureAwait(false); + + return await _retryQueueRepository + .DeleteQueuesAsync(queueIdsToDelete) + .ConfigureAwait(false); + } public async Task GetQueuesAsync(GetQueuesInput input) { - Guard.Argument(input, nameof(input)).NotNull(); + Guard.Argument(input, nameof(input)).NotNull(); - var queuesDbo = await _retryQueueRepository.GetTopSortedQueuesAsync(input.Status, input.SortOption, input.SearchGroupKey, input.TopQueues).ConfigureAwait(false); - - if (!queuesDbo.Any()) - { - return new GetQueuesResult(Enumerable.Empty()); - } + var queuesDbo = await _retryQueueRepository + .GetTopSortedQueuesAsync(input.Status, input.SortOption, input.SearchGroupKey, input.TopQueues) + .ConfigureAwait(false); - var itemsDbo = new List(); + if (!queuesDbo.Any()) + { + return new GetQueuesResult(Enumerable.Empty()); + } - var queueIds = queuesDbo.Select(q => q.Id); + var itemsDbo = new List(); - foreach (var queueId in queueIds) - { - var queueeItemsDbo = await _retryQueueItemRepository.GetItemsAsync( - new Guid[] { queueId }, - input.ItemsStatuses, - input.SeverityLevels, - input.TopItemsByQueue, - input.StuckStatusFilter) - .ConfigureAwait(false); - - itemsDbo.AddRange(queueeItemsDbo); - } + var queueIds = queuesDbo.Select(q => q.Id); - var queues = _queuesAdapter.Adapt(queuesDbo, itemsDbo); + foreach (var queueId in queueIds) + { + var queueeItemsDbo = await _retryQueueItemRepository.GetItemsAsync( + new[] { queueId }, + input.ItemsStatuses, + input.SeverityLevels, + input.TopItemsByQueue, + input.StuckStatusFilter) + .ConfigureAwait(false); - return new GetQueuesResult(queues); + itemsDbo.AddRange(queueeItemsDbo); } + var queues = _queuesAdapter.Adapt(queuesDbo, itemsDbo); + + return new GetQueuesResult(queues); + } + public async Task SaveToQueueAsync(SaveToQueueInput input) { - Guard.Argument(input).NotNull(); - - var retryQueueDbo = await _dbContext.RetryQueues - .AsQueryable() - .FirstOrDefaultAsync(q => q.QueueGroupKey == input.QueueGroupKey); + Guard.Argument(input).NotNull(); - if (retryQueueDbo is null) - { - await CreateItemIntoANewQueueAsync(input).ConfigureAwait(false); - return new SaveToQueueResult(SaveToQueueResultStatus.Created); - } + var retryQueueDbo = await _dbContext.RetryQueues + .AsQueryable() + .FirstOrDefaultAsync(q => q.QueueGroupKey == input.QueueGroupKey); - await AddItemIntoAnExistingQueueAsync(input, retryQueueDbo).ConfigureAwait(false); - return new SaveToQueueResult(SaveToQueueResultStatus.Added); + if (retryQueueDbo is null) + { + await CreateItemIntoANewQueueAsync(input).ConfigureAwait(false); + return new SaveToQueueResult(SaveToQueueResultStatus.Created); } + await AddItemIntoAnExistingQueueAsync(input, retryQueueDbo).ConfigureAwait(false); + return new SaveToQueueResult(SaveToQueueResultStatus.Added); + } + public async Task UpdateItemExecutionInfoAsync(UpdateItemExecutionInfoInput input) { - Guard.Argument(input, nameof(input)).NotNull(); + Guard.Argument(input, nameof(input)).NotNull(); - return await UpdateItemAndTryUpdateQueueToDoneAsync(input).ConfigureAwait(false); - } + return await UpdateItemAndTryUpdateQueueToDoneAsync(input).ConfigureAwait(false); + } public async Task UpdateItemsAsync(UpdateItemsInput input) { - Guard.Argument(input, nameof(input)).NotNull(); + Guard.Argument(input, nameof(input)).NotNull(); - var results = new List(); + var results = new List(); - foreach (var itemId in input.ItemIds) - { - var result = await UpdateItemAndQueueStatusAsync(new UpdateItemStatusInput(itemId, input.Status)).ConfigureAwait(false); - - results.Add(result); - } + foreach (var itemId in input.ItemIds) + { + var result = await UpdateItemAndQueueStatusAsync(new UpdateItemStatusInput(itemId, input.Status)) + .ConfigureAwait(false); - return new UpdateItemsResult(results); + results.Add(result); } + return new UpdateItemsResult(results); + } + public async Task UpdateItemStatusAsync(UpdateItemStatusInput input) { - Guard.Argument(input, nameof(input)).NotNull(); - - var filter = _dbContext.RetryQueueItems.GetFilters().Eq(i => i.Id, input.ItemId); + Guard.Argument(input, nameof(input)).NotNull(); - var update = _dbContext.RetryQueueItems.GetUpdateDefinition().Set(i => i.Status, input.Status) - .Set(i => i.ModifiedStatusDate, DateTime.UtcNow); + var filter = _dbContext.RetryQueueItems.GetFilters().Eq(i => i.Id, input.ItemId); - var updateResult = await _dbContext.RetryQueueItems.UpdateOneAsync(filter, update).ConfigureAwait(false); + var update = _dbContext.RetryQueueItems.GetUpdateDefinition().Set(i => i.Status, input.Status) + .Set(i => i.ModifiedStatusDate, DateTime.UtcNow); - if (updateResult.IsAcknowledged && updateResult.MatchedCount == 0) - { - return new UpdateItemResult(input.ItemId, UpdateItemResultStatus.ItemNotFound); - } + var updateResult = await _dbContext.RetryQueueItems.UpdateOneAsync(filter, update).ConfigureAwait(false); - return new UpdateItemResult(input.ItemId, UpdateItemResultStatus.Updated); + if (updateResult.IsAcknowledged && updateResult.MatchedCount == 0) + { + return new UpdateItemResult(input.ItemId, UpdateItemResultStatus.ItemNotFound); } + return new UpdateItemResult(input.ItemId, UpdateItemResultStatus.Updated); + } + public async Task UpdateQueuesAsync(UpdateQueuesInput input) { - Guard.Argument(input, nameof(input)).NotNull(); - - var results = new List(); + Guard.Argument(input, nameof(input)).NotNull(); - foreach (var queueGroupKey in input.QueueGroupKeys) - { - var result = await UpdateQueueAndAllItemsAsync(new UpdateItemsInQueueInput(queueGroupKey, input.ItemStatus)).ConfigureAwait(false); + var results = new List(); - results.Add(result); - } + foreach (var queueGroupKey in input.QueueGroupKeys) + { + var result = await UpdateQueueAndAllItemsAsync(new UpdateItemsInQueueInput(queueGroupKey, input.ItemStatus)) + .ConfigureAwait(false); - return new UpdateQueuesResult(results); + results.Add(result); } + return new UpdateQueuesResult(results); + } + private async Task AddItemIntoAnExistingQueueAsync(SaveToQueueInput input, RetryQueueDbo retryQueueDbo) { - // Gets the total items in the queue. - var totalItemsInQueue = _dbContext.RetryQueueItems - .AsQueryable() - .Where(i => i.RetryQueueId == retryQueueDbo.Id) - .Count(); - - // Inserts the new item at the last position in the queue. - var retryQueueItemDbo = _retryQueueItemDboFactory.Create(input, retryQueueDbo.Id, totalItemsInQueue); - await _dbContext.RetryQueueItems.InsertOneAsync(retryQueueItemDbo).ConfigureAwait(false); - - // Verifies whether to change the queue status. - if (retryQueueDbo.Status == RetryQueueStatus.Done) - { - // The queue was marked as DONE. With this new item, the status should return to ACTIVE. - await _dbContext.RetryQueues - .FindOneAndUpdateAsync( - q => q.Id == retryQueueDbo.Id, - Builders.Update.Set(q => q.Status, RetryQueueStatus.Active) - ).ConfigureAwait(false); - } + // Gets the total items in the queue. + var totalItemsInQueue = _dbContext.RetryQueueItems + .AsQueryable() + .Where(i => i.RetryQueueId == retryQueueDbo.Id) + .Count(); + + // Inserts the new item at the last position in the queue. + var retryQueueItemDbo = _retryQueueItemDboFactory.Create(input, retryQueueDbo.Id, totalItemsInQueue); + await _dbContext.RetryQueueItems.InsertOneAsync(retryQueueItemDbo).ConfigureAwait(false); + + // Verifies whether to change the queue status. + if (retryQueueDbo.Status == RetryQueueStatus.Done) + { + // The queue was marked as DONE. With this new item, the status should return to ACTIVE. + await _dbContext.RetryQueues + .FindOneAndUpdateAsync( + q => q.Id == retryQueueDbo.Id, + Builders.Update.Set(q => q.Status, RetryQueueStatus.Active) + ).ConfigureAwait(false); } + } private async Task CreateItemIntoANewQueueAsync(SaveToQueueInput input) { - // Creates the queue - var retryQueueDbo = RetryQueueDboFactory.Create(input); - await _dbContext.RetryQueues.InsertOneAsync(retryQueueDbo).ConfigureAwait(false); + // Creates the queue + var retryQueueDbo = RetryQueueDboFactory.Create(input); + await _dbContext.RetryQueues.InsertOneAsync(retryQueueDbo).ConfigureAwait(false); - // Adds the item - var retryQueueItemDbo = _retryQueueItemDboFactory.Create(input, retryQueueDbo.Id); - await _dbContext.RetryQueueItems.InsertOneAsync(retryQueueItemDbo).ConfigureAwait(false); - } + // Adds the item + var retryQueueItemDbo = _retryQueueItemDboFactory.Create(input, retryQueueDbo.Id); + await _dbContext.RetryQueueItems.InsertOneAsync(retryQueueItemDbo).ConfigureAwait(false); + } private bool IsItemInWaitingState(RetryQueueItemDbo item) { - return item.Status == RetryQueueItemStatus.Waiting; - } + return item.Status == RetryQueueItemStatus.Waiting; + } private async Task TryUpdateQueueToDoneAsync(Guid queueId) { - var anyItemStillActive = await _retryQueueItemRepository.AnyItemStillActiveAsync(queueId).ConfigureAwait(false); - - if (!anyItemStillActive) - { - var updateQueueResult = await _retryQueueRepository.UpdateStatusAsync(queueId, RetryQueueStatus.Done).ConfigureAwait(false); + var anyItemStillActive = await _retryQueueItemRepository.AnyItemStillActiveAsync(queueId).ConfigureAwait(false); - if (updateQueueResult.IsAcknowledged && updateQueueResult.MatchedCount == 0) - { - return UpdateQueueResultStatus.QueueNotFound; - } + if (!anyItemStillActive) + { + var updateQueueResult = await _retryQueueRepository.UpdateStatusAsync(queueId, RetryQueueStatus.Done) + .ConfigureAwait(false); - return UpdateQueueResultStatus.Updated; + if (updateQueueResult.IsAcknowledged && updateQueueResult.MatchedCount == 0) + { + return UpdateQueueResultStatus.QueueNotFound; } - return UpdateQueueResultStatus.NotUpdated; + return UpdateQueueResultStatus.Updated; } + return UpdateQueueResultStatus.NotUpdated; + } + private async Task UpdateItemAndQueueStatusAsync(UpdateItemStatusInput input) { - if (input.Status != RetryQueueItemStatus.Cancelled) - { - return new UpdateItemResult(input.ItemId, UpdateItemResultStatus.UpdateIsNotAllowed); - } - - var item = await _retryQueueItemRepository.GetItemAsync(input.ItemId).ConfigureAwait(false); + if (input.Status != RetryQueueItemStatus.Cancelled) + { + return new UpdateItemResult(input.ItemId, UpdateItemResultStatus.UpdateIsNotAllowed); + } - if (item is null) - { - return new UpdateItemResult(input.ItemId, UpdateItemResultStatus.ItemNotFound); - } + var item = await _retryQueueItemRepository.GetItemAsync(input.ItemId).ConfigureAwait(false); - if (!IsItemInWaitingState(item)) - { - return new UpdateItemResult(input.ItemId, UpdateItemResultStatus.ItemIsNotInWaitingState); - } + if (item is null) + { + return new UpdateItemResult(input.ItemId, UpdateItemResultStatus.ItemNotFound); + } - if (!await _retryQueueItemRepository.IsFirstWaitingInQueue(item).ConfigureAwait(false)) - { - return new UpdateItemResult(input.ItemId, UpdateItemResultStatus.ItemIsNotTheFirstWaitingInQueue); - } + if (!IsItemInWaitingState(item)) + { + return new UpdateItemResult(input.ItemId, UpdateItemResultStatus.ItemIsNotInWaitingState); + } - var updateItemResult = await UpdateItemStatusAsync(input).ConfigureAwait(false); + if (!await _retryQueueItemRepository.IsFirstWaitingInQueue(item).ConfigureAwait(false)) + { + return new UpdateItemResult(input.ItemId, UpdateItemResultStatus.ItemIsNotTheFirstWaitingInQueue); + } - if (updateItemResult.Status == UpdateItemResultStatus.ItemNotFound) - { - return updateItemResult; - } + var updateItemResult = await UpdateItemStatusAsync(input).ConfigureAwait(false); - var updateQueueResultStatus = await TryUpdateQueueToDoneAsync(item.RetryQueueId).ConfigureAwait(false); + if (updateItemResult.Status == UpdateItemResultStatus.ItemNotFound) + { + return updateItemResult; + } - if (updateQueueResultStatus == UpdateQueueResultStatus.QueueNotFound) - { - return new UpdateItemResult(input.ItemId, UpdateItemResultStatus.QueueNotFound); - } + var updateQueueResultStatus = await TryUpdateQueueToDoneAsync(item.RetryQueueId).ConfigureAwait(false); - return new UpdateItemResult(input.ItemId, UpdateItemResultStatus.Updated); + if (updateQueueResultStatus == UpdateQueueResultStatus.QueueNotFound) + { + return new UpdateItemResult(input.ItemId, UpdateItemResultStatus.QueueNotFound); } + return new UpdateItemResult(input.ItemId, UpdateItemResultStatus.Updated); + } + private async Task UpdateItemAndTryUpdateQueueToDoneAsync(UpdateItemExecutionInfoInput input) { - //update item - var updateItemResult = await _retryQueueItemRepository - .UpdateItemAsync(input.ItemId, input.Status, input.AttemptCount, input.LastExecution, input.Description).ConfigureAwait(false); - - if (updateItemResult.Status == UpdateItemResultStatus.ItemNotFound) - { - return updateItemResult; - } - - // update queue last execution and try update queue to done - var updateQueueResultStatus = await UpdateQueueLastExecutionAndTryUpdateQueueToDoneAsync(input.QueueId, input.LastExecution).ConfigureAwait(false); + //update item + var updateItemResult = await _retryQueueItemRepository + .UpdateItemAsync(input.ItemId, input.Status, input.AttemptCount, input.LastExecution, input.Description) + .ConfigureAwait(false); + + if (updateItemResult.Status == UpdateItemResultStatus.ItemNotFound) + { + return updateItemResult; + } - if (updateQueueResultStatus == UpdateQueueResultStatus.QueueNotFound) - { - return new UpdateItemResult(input.ItemId, UpdateItemResultStatus.QueueNotFound); - } + // update queue last execution and try update queue to done + var updateQueueResultStatus = + await UpdateQueueLastExecutionAndTryUpdateQueueToDoneAsync(input.QueueId, input.LastExecution) + .ConfigureAwait(false); - return new UpdateItemResult(input.ItemId, UpdateItemResultStatus.Updated); + if (updateQueueResultStatus == UpdateQueueResultStatus.QueueNotFound) + { + return new UpdateItemResult(input.ItemId, UpdateItemResultStatus.QueueNotFound); } + return new UpdateItemResult(input.ItemId, UpdateItemResultStatus.Updated); + } + private async Task UpdateQueueAndAllItemsAsync(UpdateItemsInQueueInput input) { - var queue = await _retryQueueRepository.GetQueueAsync(input.QueueGroupKey).ConfigureAwait(false); + var queue = await _retryQueueRepository.GetQueueAsync(input.QueueGroupKey).ConfigureAwait(false); - if (queue is null) - { - return new UpdateQueueResult(input.QueueGroupKey, UpdateQueueResultStatus.QueueNotFound, RetryQueueStatus.None); - } - - if (input.ItemStatus != RetryQueueItemStatus.Cancelled) - { - return new UpdateQueueResult(input.QueueGroupKey, UpdateQueueResultStatus.UpdateIsNotAllowed, queue.Status); - } - - if (queue.Status != RetryQueueStatus.Active) - { - return new UpdateQueueResult(input.QueueGroupKey, UpdateQueueResultStatus.QueueIsNotActive, queue.Status); - } + if (queue is null) + { + return new UpdateQueueResult(input.QueueGroupKey, UpdateQueueResultStatus.QueueNotFound, + RetryQueueStatus.None); + } - var items = await _retryQueueItemRepository - .GetItemsAsync(new Guid[] { queue.Id }, new RetryQueueItemStatus[] { RetryQueueItemStatus.Waiting }) - .ConfigureAwait(false); + if (input.ItemStatus != RetryQueueItemStatus.Cancelled) + { + return new UpdateQueueResult(input.QueueGroupKey, UpdateQueueResultStatus.UpdateIsNotAllowed, queue.Status); + } - if (!items.Any()) - { - return new UpdateQueueResult(input.QueueGroupKey, UpdateQueueResultStatus.QueueHasNoItemsWaiting, queue.Status); - } + if (queue.Status != RetryQueueStatus.Active) + { + return new UpdateQueueResult(input.QueueGroupKey, UpdateQueueResultStatus.QueueIsNotActive, queue.Status); + } - foreach (var item in items) - { - var updateItemResult = await UpdateItemStatusAsync(new UpdateItemStatusInput(item.Id, input.ItemStatus)).ConfigureAwait(false); + var items = await _retryQueueItemRepository + .GetItemsAsync(new[] { queue.Id }, new[] { RetryQueueItemStatus.Waiting }) + .ConfigureAwait(false); - if (updateItemResult.Status != UpdateItemResultStatus.Updated) - { - return new UpdateQueueResult(input.QueueGroupKey, UpdateQueueResultStatus.FailedToUpdateAllItems, queue.Status); - } - } + if (!items.Any()) + { + return new UpdateQueueResult(input.QueueGroupKey, UpdateQueueResultStatus.QueueHasNoItemsWaiting, + queue.Status); + } - var updateQueueResultStatus = await TryUpdateQueueToDoneAsync(queue.Id).ConfigureAwait(false); + foreach (var item in items) + { + var updateItemResult = await UpdateItemStatusAsync(new UpdateItemStatusInput(item.Id, input.ItemStatus)) + .ConfigureAwait(false); - if (updateQueueResultStatus == UpdateQueueResultStatus.QueueNotFound) + if (updateItemResult.Status != UpdateItemResultStatus.Updated) { - return new UpdateQueueResult(input.QueueGroupKey, UpdateQueueResultStatus.AllItemsUpdatedButFailedToUpdateQueue, queue.Status); + return new UpdateQueueResult(input.QueueGroupKey, UpdateQueueResultStatus.FailedToUpdateAllItems, + queue.Status); } + } - queue = await _retryQueueRepository.GetQueueAsync(input.QueueGroupKey).ConfigureAwait(false); + var updateQueueResultStatus = await TryUpdateQueueToDoneAsync(queue.Id).ConfigureAwait(false); - return new UpdateQueueResult(input.QueueGroupKey, updateQueueResultStatus, queue.Status); + if (updateQueueResultStatus == UpdateQueueResultStatus.QueueNotFound) + { + return new UpdateQueueResult(input.QueueGroupKey, + UpdateQueueResultStatus.AllItemsUpdatedButFailedToUpdateQueue, queue.Status); } - private async Task UpdateQueueLastExecutionAndTryUpdateQueueToDoneAsync(Guid queueId, DateTime lastExecution) + queue = await _retryQueueRepository.GetQueueAsync(input.QueueGroupKey).ConfigureAwait(false); + + return new UpdateQueueResult(input.QueueGroupKey, updateQueueResultStatus, queue.Status); + } + + private async Task UpdateQueueLastExecutionAndTryUpdateQueueToDoneAsync(Guid queueId, + DateTime lastExecution) { - var anyItemStillActive = await _retryQueueItemRepository.AnyItemStillActiveAsync(queueId).ConfigureAwait(false); + var anyItemStillActive = await _retryQueueItemRepository.AnyItemStillActiveAsync(queueId).ConfigureAwait(false); - if (anyItemStillActive) - { - // update queue last execution only - var updateQueueLastExecutionResult = await _retryQueueRepository.UpdateLastExecutionAsync(queueId, lastExecution).ConfigureAwait(false); + if (anyItemStillActive) + { + // update queue last execution only + var updateQueueLastExecutionResult = await _retryQueueRepository + .UpdateLastExecutionAsync(queueId, lastExecution).ConfigureAwait(false); - if (updateQueueLastExecutionResult.IsAcknowledged && updateQueueLastExecutionResult.MatchedCount == 0) - { - return UpdateQueueResultStatus.QueueNotFound; - } - } - else + if (updateQueueLastExecutionResult.IsAcknowledged && updateQueueLastExecutionResult.MatchedCount == 0) { - // update queue last execution and the status to done - var updateQueueResult = await _retryQueueRepository.UpdateStatusAndLastExecutionAsync(queueId, RetryQueueStatus.Done, lastExecution).ConfigureAwait(false); - - if (updateQueueResult.IsAcknowledged && updateQueueResult.MatchedCount == 0) - { - return UpdateQueueResultStatus.QueueNotFound; - } + return UpdateQueueResultStatus.QueueNotFound; } + } + else + { + // update queue last execution and the status to done + var updateQueueResult = await _retryQueueRepository + .UpdateStatusAndLastExecutionAsync(queueId, RetryQueueStatus.Done, lastExecution).ConfigureAwait(false); - return UpdateQueueResultStatus.Updated; + if (updateQueueResult.IsAcknowledged && updateQueueResult.MatchedCount == 0) + { + return UpdateQueueResultStatus.QueueNotFound; + } } + + return UpdateQueueResultStatus.Updated; + } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.Postgres/ConnectionProvider.cs b/src/KafkaFlow.Retry.Postgres/ConnectionProvider.cs index d9217c18..a8eb3003 100644 --- a/src/KafkaFlow.Retry.Postgres/ConnectionProvider.cs +++ b/src/KafkaFlow.Retry.Postgres/ConnectionProvider.cs @@ -6,15 +6,15 @@ internal sealed class ConnectionProvider : IConnectionProvider { public IDbConnection Create(PostgresDbSettings postgresDbSettings) { - Guard.Argument(postgresDbSettings).NotNull(); + Guard.Argument(postgresDbSettings).NotNull(); - return new DbConnectionContext(postgresDbSettings, false); - } + return new DbConnectionContext(postgresDbSettings, false); + } public IDbConnectionWithinTransaction CreateWithinTransaction(PostgresDbSettings postgresDbSettings) { - Guard.Argument(postgresDbSettings).NotNull(); + Guard.Argument(postgresDbSettings).NotNull(); - return new DbConnectionContext(postgresDbSettings, true); - } + return new DbConnectionContext(postgresDbSettings, true); + } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.Postgres/DbConnectionContext.cs b/src/KafkaFlow.Retry.Postgres/DbConnectionContext.cs index b6745418..cb229dc5 100644 --- a/src/KafkaFlow.Retry.Postgres/DbConnectionContext.cs +++ b/src/KafkaFlow.Retry.Postgres/DbConnectionContext.cs @@ -15,74 +15,77 @@ internal sealed class DbConnectionContext : IDbConnectionWithinTransaction public DbConnectionContext(PostgresDbSettings postgresDbSettings, bool withinTransaction) { - Guard.Argument(postgresDbSettings).NotNull(); - _postgresDbSettings = postgresDbSettings; - _withinTransaction = withinTransaction; - } + Guard.Argument(postgresDbSettings).NotNull(); + _postgresDbSettings = postgresDbSettings; + _withinTransaction = withinTransaction; + } public void Commit() { - if (_sqlTransaction is object) - { - _sqlTransaction.Commit(); - _committed = true; - } + if (_sqlTransaction is object) + { + _sqlTransaction.Commit(); + _committed = true; } + } public NpgsqlCommand CreateCommand() { - var dbCommand = GetDbConnection().CreateCommand(); - - if (_withinTransaction) - { - dbCommand.Transaction = GetDbTransaction(); - } + var dbCommand = GetDbConnection().CreateCommand(); - return dbCommand; + if (_withinTransaction) + { + dbCommand.Transaction = GetDbTransaction(); } + return dbCommand; + } + public void Dispose() { - if (_sqlTransaction is object) + if (_sqlTransaction is object) + { + if (!_committed) { - if (!_committed) - { - Rollback(); - } - _sqlTransaction.Dispose(); + Rollback(); } - if (_sqlConnection is object) - { - _sqlConnection.Dispose(); - } + _sqlTransaction.Dispose(); } + if (_sqlConnection is object) + { + _sqlConnection.Dispose(); + } + } + public void Rollback() { - if (_sqlTransaction is object) - { - _sqlTransaction.Rollback(); - } + if (_sqlTransaction is object) + { + _sqlTransaction.Rollback(); } + } private NpgsqlConnection GetDbConnection() { - if (_sqlConnection is null) - { - _sqlConnection = new NpgsqlConnection(_postgresDbSettings.ConnectionString); - _sqlConnection.Open(); - _sqlConnection.ChangeDatabase(_postgresDbSettings.DatabaseName); - } - return _sqlConnection; + if (_sqlConnection is null) + { + _sqlConnection = new NpgsqlConnection(_postgresDbSettings.ConnectionString); + _sqlConnection.Open(); + _sqlConnection.ChangeDatabase(_postgresDbSettings.DatabaseName); } + return _sqlConnection; + } + private NpgsqlTransaction GetDbTransaction() { - if (_sqlTransaction is null) - { - _sqlTransaction = GetDbConnection().BeginTransaction(); - } - return _sqlTransaction; + if (_sqlTransaction is null) + { + _sqlTransaction = GetDbConnection().BeginTransaction(); } + + return _sqlTransaction; + } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueDboFactory.cs b/src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueDboFactory.cs index 36279b0f..01595e32 100644 --- a/src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueDboFactory.cs +++ b/src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueDboFactory.cs @@ -8,16 +8,16 @@ internal sealed class RetryQueueDboFactory : IRetryQueueDboFactory { public RetryQueueDbo Create(SaveToQueueInput input) { - Guard.Argument(input).NotNull(); + Guard.Argument(input).NotNull(); - return new RetryQueueDbo - { - IdDomain = Guid.NewGuid(), - SearchGroupKey = input.SearchGroupKey, - QueueGroupKey = input.QueueGroupKey, - CreationDate = input.CreationDate, - LastExecution = input.LastExecution.Value, - Status = input.QueueStatus - }; - } + return new RetryQueueDbo + { + IdDomain = Guid.NewGuid(), + SearchGroupKey = input.SearchGroupKey, + QueueGroupKey = input.QueueGroupKey, + CreationDate = input.CreationDate, + LastExecution = input.LastExecution.Value, + Status = input.QueueStatus + }; + } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueItemDboFactory.cs b/src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueItemDboFactory.cs index 45c044c8..47a8f2f3 100644 --- a/src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueItemDboFactory.cs +++ b/src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueItemDboFactory.cs @@ -8,22 +8,22 @@ internal sealed class RetryQueueItemDboFactory : IRetryQueueItemDboFactory { public RetryQueueItemDbo Create(SaveToQueueInput input, long retryQueueId, Guid retryQueueDomainId) { - Guard.Argument(input, nameof(input)).NotNull(); - Guard.Argument(retryQueueId, nameof(retryQueueId)).Positive(); - Guard.Argument(retryQueueDomainId, nameof(retryQueueDomainId)).NotDefault(); + Guard.Argument(input, nameof(input)).NotNull(); + Guard.Argument(retryQueueId, nameof(retryQueueId)).Positive(); + Guard.Argument(retryQueueDomainId, nameof(retryQueueDomainId)).NotDefault(); - return new RetryQueueItemDbo - { - IdDomain = Guid.NewGuid(), - CreationDate = input.CreationDate, - LastExecution = input.LastExecution, - ModifiedStatusDate = input.ModifiedStatusDate, - AttemptsCount = input.AttemptsCount, - RetryQueueId = retryQueueId, - DomainRetryQueueId = retryQueueDomainId, - Status = input.ItemStatus, - SeverityLevel = input.SeverityLevel, - Description = input.Description - }; - } + return new RetryQueueItemDbo + { + IdDomain = Guid.NewGuid(), + CreationDate = input.CreationDate, + LastExecution = input.LastExecution, + ModifiedStatusDate = input.ModifiedStatusDate, + AttemptsCount = input.AttemptsCount, + RetryQueueId = retryQueueId, + DomainRetryQueueId = retryQueueDomainId, + Status = input.ItemStatus, + SeverityLevel = input.SeverityLevel, + Description = input.Description + }; + } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueItemMessageDboFactory.cs b/src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueItemMessageDboFactory.cs index 13d8995d..2741ad21 100644 --- a/src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueItemMessageDboFactory.cs +++ b/src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueItemMessageDboFactory.cs @@ -7,18 +7,18 @@ internal sealed class RetryQueueItemMessageDboFactory : IRetryQueueItemMessageDb { public RetryQueueItemMessageDbo Create(RetryQueueItemMessage retryQueueItemMessage, long retryQueueItemId) { - Guard.Argument(retryQueueItemMessage, nameof(retryQueueItemMessage)).NotNull(); - Guard.Argument(retryQueueItemId, nameof(retryQueueItemId)).Positive(); + Guard.Argument(retryQueueItemMessage, nameof(retryQueueItemMessage)).NotNull(); + Guard.Argument(retryQueueItemId, nameof(retryQueueItemId)).Positive(); - return new RetryQueueItemMessageDbo - { - IdRetryQueueItem = retryQueueItemId, - Key = retryQueueItemMessage.Key, - Value = retryQueueItemMessage.Value, - Offset = retryQueueItemMessage.Offset, - Partition = retryQueueItemMessage.Partition, - TopicName = retryQueueItemMessage.TopicName, - UtcTimeStamp = retryQueueItemMessage.UtcTimeStamp, - }; - } + return new RetryQueueItemMessageDbo + { + IdRetryQueueItem = retryQueueItemId, + Key = retryQueueItemMessage.Key, + Value = retryQueueItemMessage.Value, + Offset = retryQueueItemMessage.Offset, + Partition = retryQueueItemMessage.Partition, + TopicName = retryQueueItemMessage.TopicName, + UtcTimeStamp = retryQueueItemMessage.UtcTimeStamp + }; + } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueItemMessageHeaderDboFactory.cs b/src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueItemMessageHeaderDboFactory.cs index bb71f0c4..b0997d0f 100644 --- a/src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueItemMessageHeaderDboFactory.cs +++ b/src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueItemMessageHeaderDboFactory.cs @@ -9,21 +9,21 @@ internal sealed class RetryQueueItemMessageHeaderDboFactory : IRetryQueueItemMes { public IEnumerable Create(IEnumerable headers, long retryQueueItemId) { - Guard.Argument(headers).NotNull(); - Guard.Argument(retryQueueItemId, nameof(retryQueueItemId)).Positive(); + Guard.Argument(headers).NotNull(); + Guard.Argument(retryQueueItemId, nameof(retryQueueItemId)).Positive(); - return headers.Select(h => Adapt(h, retryQueueItemId)); - } + return headers.Select(h => Adapt(h, retryQueueItemId)); + } private RetryQueueItemMessageHeaderDbo Adapt(MessageHeader header, long retryQueueItemId) { - Guard.Argument(header).NotNull(); + Guard.Argument(header).NotNull(); - return new RetryQueueItemMessageHeaderDbo - { - Key = header.Key, - Value = header.Value, - RetryQueueItemMessageId = retryQueueItemId - }; - } + return new RetryQueueItemMessageHeaderDbo + { + Key = header.Key, + Value = header.Value, + RetryQueueItemMessageId = retryQueueItemId + }; + } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.Postgres/Model/RetryQueuesDboWrapper.cs b/src/KafkaFlow.Retry.Postgres/Model/RetryQueuesDboWrapper.cs index 4e76addd..0c939c58 100644 --- a/src/KafkaFlow.Retry.Postgres/Model/RetryQueuesDboWrapper.cs +++ b/src/KafkaFlow.Retry.Postgres/Model/RetryQueuesDboWrapper.cs @@ -8,11 +8,11 @@ internal class RetryQueuesDboWrapper { public RetryQueuesDboWrapper() { - QueuesDbos = new RetryQueueDbo[0]; - ItemsDbos = new RetryQueueItemDbo[0]; - MessagesDbos = new RetryQueueItemMessageDbo[0]; - HeadersDbos = new RetryQueueItemMessageHeaderDbo[0]; - } + QueuesDbos = new RetryQueueDbo[0]; + ItemsDbos = new RetryQueueItemDbo[0]; + MessagesDbos = new RetryQueueItemMessageDbo[0]; + HeadersDbos = new RetryQueueItemMessageHeaderDbo[0]; + } public IList HeadersDbos { get; set; } public IList ItemsDbos { get; set; } diff --git a/src/KafkaFlow.Retry.Postgres/Model/Schema/Script.cs b/src/KafkaFlow.Retry.Postgres/Model/Schema/Script.cs index f66dc08c..102b86d2 100644 --- a/src/KafkaFlow.Retry.Postgres/Model/Schema/Script.cs +++ b/src/KafkaFlow.Retry.Postgres/Model/Schema/Script.cs @@ -6,10 +6,10 @@ public class Script { public Script(string value) { - Guard.Argument(value, nameof(value)).NotNull(); + Guard.Argument(value, nameof(value)).NotNull(); - Value = value; - } + Value = value; + } public string Value { get; set; } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.Postgres/PostgresDbDataProviderFactory.cs b/src/KafkaFlow.Retry.Postgres/PostgresDbDataProviderFactory.cs index 66259340..973f7647 100644 --- a/src/KafkaFlow.Retry.Postgres/PostgresDbDataProviderFactory.cs +++ b/src/KafkaFlow.Retry.Postgres/PostgresDbDataProviderFactory.cs @@ -15,58 +15,63 @@ public sealed class PostgresDbDataProviderFactory { public IRetryDurableQueueRepositoryProvider Create(PostgresDbSettings postgresDbSettings) { - Guard.Argument(postgresDbSettings) - .NotNull("It is mandatory to config the factory before creating new instances of IRetryQueueDataProvider. Make sure the Config method is executed before the Create method."); + Guard.Argument(postgresDbSettings) + .NotNull( + "It is mandatory to config the factory before creating new instances of IRetryQueueDataProvider. Make sure the Config method is executed before the Create method."); - var retryQueueItemMessageAdapter = - new RetryQueueItemMessageDboFactory(); + var retryQueueItemMessageAdapter = + new RetryQueueItemMessageDboFactory(); - var retryQueueReader = new RetryQueueReader( - new RetryQueueAdapter(), - new RetryQueueItemAdapter(), - new RetryQueueItemMessageAdapter(), - new RetryQueueItemMessageHeaderAdapter() - ); + var retryQueueReader = new RetryQueueReader( + new RetryQueueAdapter(), + new RetryQueueItemAdapter(), + new RetryQueueItemMessageAdapter(), + new RetryQueueItemMessageHeaderAdapter() + ); - return new RetryQueueDataProvider( - postgresDbSettings, - new ConnectionProvider(), - new RetryQueueItemMessageHeaderRepository(), - new RetryQueueItemMessageRepository(), - new RetryQueueItemRepository(), - new RetryQueueRepository(), - new RetryQueueDboFactory(), - new RetryQueueItemDboFactory(), - retryQueueReader, - retryQueueItemMessageAdapter, - new RetryQueueItemMessageHeaderDboFactory()); - } + return new RetryQueueDataProvider( + postgresDbSettings, + new ConnectionProvider(), + new RetryQueueItemMessageHeaderRepository(), + new RetryQueueItemMessageRepository(), + new RetryQueueItemRepository(), + new RetryQueueRepository(), + new RetryQueueDboFactory(), + new RetryQueueItemDboFactory(), + retryQueueReader, + retryQueueItemMessageAdapter, + new RetryQueueItemMessageHeaderDboFactory()); + } - public IRetrySchemaCreator CreateSchemaCreator(PostgresDbSettings postgresDbSettings) => new RetrySchemaCreator(postgresDbSettings, GetScriptsForSchemaCreation()); + public IRetrySchemaCreator CreateSchemaCreator(PostgresDbSettings postgresDbSettings) + { + return new RetrySchemaCreator(postgresDbSettings, GetScriptsForSchemaCreation()); + } private IEnumerable