diff --git a/Btms.Business.Tests/Commands/SyncClearanceRequestsCommandTests.cs b/Btms.Business.Tests/Commands/SyncClearanceRequestsCommandTests.cs index 85c26217..63f3a89a 100644 --- a/Btms.Business.Tests/Commands/SyncClearanceRequestsCommandTests.cs +++ b/Btms.Business.Tests/Commands/SyncClearanceRequestsCommandTests.cs @@ -51,7 +51,7 @@ public async Task WhenClearanceRequestBlobsExist_ThenTheyShouldBePlacedOnInterna await handler.Handle(command, CancellationToken.None); // ASSERT - await bus.Received(1).Publish(Arg.Any(), "ALVS", + await bus.Received(1).Publish(Arg.Any(), "CLEARANCEREQUESTS", Arg.Any>(), Arg.Any()); } } diff --git a/Btms.Business/BusinessOptions.cs b/Btms.Business/BusinessOptions.cs index a62a3bdb..30de4f14 100644 --- a/Btms.Business/BusinessOptions.cs +++ b/Btms.Business/BusinessOptions.cs @@ -1,5 +1,6 @@ using System.ComponentModel.DataAnnotations; using Btms.Azure; +using Btms.Business.Commands; namespace Btms.Business; @@ -7,6 +8,42 @@ public class BusinessOptions { public const string SectionName = nameof(BusinessOptions); - [Required] public string DmpBlobRootFolder { get; set; } = "RAW"; + private readonly int defaultDegreeOfParallelism = Math.Max(Environment.ProcessorCount / 4, 1); + [Required] public string DmpBlobRootFolder { get; set; } = "RAW"; + + public Dictionary> ConcurrencyConfiguration { get; set; } + + public enum Feature + { + BlobPaths, + BlobItems + } + public BusinessOptions() + { + ConcurrencyConfiguration = new Dictionary> + { + { + nameof(SyncNotificationsCommand), new Dictionary() + { + { Feature.BlobPaths, defaultDegreeOfParallelism }, { Feature.BlobItems, defaultDegreeOfParallelism } + } + }, + { + nameof(SyncClearanceRequestsCommand), new Dictionary() + { + { Feature.BlobPaths, defaultDegreeOfParallelism }, { Feature.BlobItems, defaultDegreeOfParallelism } + } + } + }; + } + + public int GetConcurrency(Feature feature) + { + if (ConcurrencyConfiguration.TryGetValue(typeof(T).Name, out var degreeOfParallelismDictionary)) + { + return degreeOfParallelismDictionary.GetValueOrDefault(feature, defaultDegreeOfParallelism); + } + return defaultDegreeOfParallelism; + } } \ No newline at end of file diff --git a/Btms.Business/Commands/SyncClearanceRequestsCommand.cs b/Btms.Business/Commands/SyncClearanceRequestsCommand.cs index f84431cc..0cdf415a 100644 --- a/Btms.Business/Commands/SyncClearanceRequestsCommand.cs +++ b/Btms.Business/Commands/SyncClearanceRequestsCommand.cs @@ -20,14 +20,14 @@ internal class Handler( IOptions businessOptions, ISyncJobStore syncJobStore) : SyncCommand.Handler(syncMetrics, bus, logger, sensitiveDataSerializer, - blobService, syncJobStore) + blobService, businessOptions, syncJobStore) { public override async Task Handle(SyncClearanceRequestsCommand request, CancellationToken cancellationToken) { var rootFolder = string.IsNullOrEmpty(request.RootFolder) - ? businessOptions.Value.DmpBlobRootFolder + ? Options.DmpBlobRootFolder : request.RootFolder; - await SyncBlobPaths(request.SyncPeriod, "ALVS", request.JobId, cancellationToken,$"{rootFolder}/ALVS"); + await SyncBlobPaths(request.SyncPeriod, "CLEARANCEREQUESTS", request.JobId, cancellationToken,$"{rootFolder}/ALVS"); } } diff --git a/Btms.Business/Commands/SyncDecisionsCommand.cs b/Btms.Business/Commands/SyncDecisionsCommand.cs index a549a9f1..5c8f94e0 100644 --- a/Btms.Business/Commands/SyncDecisionsCommand.cs +++ b/Btms.Business/Commands/SyncDecisionsCommand.cs @@ -19,12 +19,12 @@ internal class Handler( IBlobService blobService, IOptions businessOptions, ISyncJobStore syncJobStore) - : SyncCommand.Handler(syncMetrics, bus, logger, sensitiveDataSerializer, blobService, syncJobStore) + : SyncCommand.Handler(syncMetrics, bus, logger, sensitiveDataSerializer, blobService, businessOptions, syncJobStore) { public override async Task Handle(SyncDecisionsCommand request, CancellationToken cancellationToken) { var rootFolder = string.IsNullOrEmpty(request.RootFolder) - ? businessOptions.Value.DmpBlobRootFolder + ? Options.DmpBlobRootFolder : request.RootFolder; await SyncBlobPaths(request.SyncPeriod, "DECISIONS", request.JobId, cancellationToken,$"{rootFolder}/DECISIONS"); diff --git a/Btms.Business/Commands/SyncGmrsCommand.cs b/Btms.Business/Commands/SyncGmrsCommand.cs index d2fc0e65..be38c667 100644 --- a/Btms.Business/Commands/SyncGmrsCommand.cs +++ b/Btms.Business/Commands/SyncGmrsCommand.cs @@ -19,12 +19,12 @@ internal class Handler( IBlobService blobService, IOptions businessOptions, ISyncJobStore syncJobStore) - : SyncCommand.Handler(syncMetrics, bus, logger, sensitiveDataSerializer, blobService, syncJobStore) + : SyncCommand.Handler(syncMetrics, bus, logger, sensitiveDataSerializer, blobService, businessOptions, syncJobStore) { public override async Task Handle(SyncGmrsCommand request, CancellationToken cancellationToken) { var rootFolder = string.IsNullOrEmpty(request.RootFolder) - ? businessOptions.Value.DmpBlobRootFolder + ? Options.DmpBlobRootFolder : request.RootFolder; await SyncBlobPaths(request.SyncPeriod, "GMR", request.JobId, cancellationToken, $"{rootFolder}/GVMSAPIRESPONSE"); diff --git a/Btms.Business/Commands/SyncHandler.cs b/Btms.Business/Commands/SyncHandler.cs index b5260e8f..a18ddeda 100644 --- a/Btms.Business/Commands/SyncHandler.cs +++ b/Btms.Business/Commands/SyncHandler.cs @@ -6,6 +6,7 @@ using System.Diagnostics; using System.Text.Json.Serialization; using Btms.Metrics; +using Microsoft.Extensions.Options; using IRequest = MediatR.IRequest; namespace Btms.Business.Commands; @@ -51,12 +52,14 @@ internal abstract class Handler( ILogger logger, ISensitiveDataSerializer sensitiveDataSerializer, IBlobService blobService, + IOptions options, ISyncJobStore syncJobStore) : MediatR.IRequestHandler where T : IRequest { - private readonly int maxDegreeOfParallelism = Math.Max(Environment.ProcessorCount / 4, 1); - + // private readonly int defaultDegreeOfParallelism = Math.Max(Environment.ProcessorCount / 4, 1); + protected readonly BusinessOptions Options = options.Value; + public const string ActivityName = "Btms.ProcessBlob"; public abstract Task Handle(T request, CancellationToken cancellationToken); @@ -65,20 +68,21 @@ protected async Task SyncBlobPaths(SyncPeriod period, string topic, Gu { var job = syncJobStore.GetJob(jobId); job?.Start(); + var degreeOfParallelism = options.Value.GetConcurrency(BusinessOptions.Feature.BlobPaths); using (logger.BeginScope(new List> { new("JobId", job?.JobId!), new("SyncPeriod", period.ToString()), - new("Parallelism", maxDegreeOfParallelism), + new("Parallelism", degreeOfParallelism), new("ProcessorCount", Environment.ProcessorCount), new("Command", typeof(T).Name), })) { - logger.SyncStarted(job?.JobId.ToString()!, period.ToString(), maxDegreeOfParallelism, Environment.ProcessorCount, typeof(T).Name); + logger.SyncStarted(job?.JobId.ToString()!, period.ToString(), degreeOfParallelism, Environment.ProcessorCount, typeof(T).Name); try { await Parallel.ForEachAsync(paths, - new ParallelOptions() { MaxDegreeOfParallelism = maxDegreeOfParallelism }, + new ParallelOptions() { MaxDegreeOfParallelism = degreeOfParallelism }, async (path, token) => { using (logger.BeginScope(new List> { new("SyncPath", path), })) @@ -104,8 +108,9 @@ protected async Task SyncBlobPath(string path, SyncPeriod period, stri CancellationToken cancellationToken) { var result = blobService.GetResourcesAsync($"{path}{period.GetPeriodPath()}", cancellationToken); + var degreeOfParallelism = options.Value.GetConcurrency(BusinessOptions.Feature.BlobItems); - await Parallel.ForEachAsync(result, new ParallelOptions() { CancellationToken = cancellationToken, MaxDegreeOfParallelism = maxDegreeOfParallelism }, async (item, token) => + await Parallel.ForEachAsync(result, new ParallelOptions() { CancellationToken = cancellationToken, MaxDegreeOfParallelism = degreeOfParallelism }, async (item, token) => { await SyncBlob(path, topic, item, job, cancellationToken); }); @@ -115,8 +120,10 @@ protected async Task SyncBlobPath(string path, SyncPeriod period, stri protected async Task SyncBlobs(SyncPeriod period, string topic, Guid jobId, CancellationToken cancellationToken, params string[] paths) { var job = syncJobStore.GetJob(jobId); + var degreeOfParallelism = options.Value.GetConcurrency(BusinessOptions.Feature.BlobItems); + job?.Start(); - logger.LogInformation("SyncNotifications period: {Period}, maxDegreeOfParallelism={MaxDegreeOfParallelism}, Environment.ProcessorCount={ProcessorCount}", period.ToString(), maxDegreeOfParallelism, Environment.ProcessorCount); + logger.LogInformation("SyncNotifications period: {Period}, maxDegreeOfParallelism={degreeOfParallelism}, Environment.ProcessorCount={ProcessorCount}", period.ToString(), degreeOfParallelism, Environment.ProcessorCount); try { foreach (var path in paths) diff --git a/Btms.Business/Commands/SyncNotificationsCommand.cs b/Btms.Business/Commands/SyncNotificationsCommand.cs index 5f2ef1f9..179aeda4 100644 --- a/Btms.Business/Commands/SyncNotificationsCommand.cs +++ b/Btms.Business/Commands/SyncNotificationsCommand.cs @@ -27,12 +27,12 @@ internal class Handler( IOptions businessOptions, ISyncJobStore syncJobStore) : SyncCommand.Handler(syncMetrics, bus, logger, sensitiveDataSerializer, - blobService, syncJobStore) + blobService, businessOptions, syncJobStore) { public override async Task Handle(SyncNotificationsCommand request, CancellationToken cancellationToken) { var rootFolder = string.IsNullOrEmpty(request.RootFolder) - ? businessOptions.Value.DmpBlobRootFolder + ? Options.DmpBlobRootFolder : request.RootFolder; if (request.BlobFiles.Any()) diff --git a/Btms.Consumers/ConsumerOptions.cs b/Btms.Consumers/ConsumerOptions.cs new file mode 100644 index 00000000..8ea4430f --- /dev/null +++ b/Btms.Consumers/ConsumerOptions.cs @@ -0,0 +1,15 @@ +using System.ComponentModel.DataAnnotations; +using Btms.Azure; + +namespace Btms.Consumers; + +public class ConsumerOptions +{ + public const string SectionName = nameof(ConsumerOptions); + + public int InMemoryNotifications { get; set; } = 2; + public int InMemoryGmrs { get; set; } = 2; + public int InMemoryClearanceRequests { get; set; } = 2; + public int InMemoryDecisions { get; set; } = 2; + +} \ No newline at end of file diff --git a/Btms.Consumers/Extensions/ServiceCollectionExtensions.cs b/Btms.Consumers/Extensions/ServiceCollectionExtensions.cs index 04ce9a43..5078f2ea 100644 --- a/Btms.Consumers/Extensions/ServiceCollectionExtensions.cs +++ b/Btms.Consumers/Extensions/ServiceCollectionExtensions.cs @@ -1,3 +1,5 @@ +using System.Configuration; +using Btms.Common.Extensions; using Btms.Consumers.Interceptors; using Btms.Consumers.MemoryQueue; using Btms.Metrics.Extensions; @@ -5,6 +7,7 @@ using Btms.Types.Ipaffs; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; using SlimMessageBus.Host; using SlimMessageBus.Host.Interceptor; using SlimMessageBus.Host.Memory; @@ -17,6 +20,17 @@ public static class ServiceCollectionExtensions public static IServiceCollection AddConsumers(this IServiceCollection services, IConfiguration configuration) { + services.BtmsAddOptions(configuration, ConsumerOptions.SectionName); + + var consumerOpts = configuration + .GetSection(ConsumerOptions.SectionName) + .Get() ?? new ConsumerOptions(); + + // services.BtmsAddOptions(configuration, ConsumerOptions.SectionName); + // + // var consumerOpts = services.GetRequiredService>(); + + services.AddBtmsMetrics(); services.AddSingleton(); services.AddSingleton(typeof(IConsumerInterceptor<>), typeof(MetricsInterceptor<>)); @@ -42,24 +56,24 @@ public static IServiceCollection AddConsumers(this IServiceCollection services, .Produce(x => x.DefaultTopic("NOTIFICATIONS")) .Consume(x => { - x.Instances(2); + x.Instances(consumerOpts.InMemoryNotifications); x.Topic("NOTIFICATIONS").WithConsumer(); }) .Produce(x => x.DefaultTopic("GMR")) .Consume(x => { - x.Instances(2); + x.Instances(consumerOpts.InMemoryGmrs); x.Topic("GMR").WithConsumer(); }) .Produce(x => x.DefaultTopic(nameof(AlvsClearanceRequest))) .Consume(x => { - x.Instances(2); - x.Topic("ALVS").WithConsumer(); + x.Instances(consumerOpts.InMemoryClearanceRequests); + x.Topic("CLEARANCEREQUESTS").WithConsumer(); }) .Consume(x => { - x.Instances(2); + x.Instances(consumerOpts.InMemoryDecisions); x.Topic("DECISIONS").WithConsumer(); }); });