From 9886c9bcb94c99b803ce48dc08b7f56cada6626a Mon Sep 17 00:00:00 2001 From: Chris Patterson Date: Wed, 20 Apr 2022 07:10:20 -0500 Subject: [PATCH] Previous commit is the minimal change required, this commit includes all the cleanup of no longer required stuff. --- src/SampleBatch.Api/Program.cs | 25 ++++----- src/SampleBatch.Api/Startup.cs | 8 +-- .../SuspendOrder/SuspendOrderActivity.cs | 2 +- .../Consumers/ProcessBatchJobConsumer.cs | 3 +- .../SampleBatchDbContext.cs | 20 +++---- .../StateMachines/BatchJobStateMachine.cs | 30 +++++----- .../StateMachines/BatchStateMachine.cs | 56 +++++++++---------- .../EfDbCreatedHostedService.cs | 20 +++---- .../MassTransitConsoleHostedService.cs | 29 ---------- src/SampleBatch.Service/Program.cs | 2 - 10 files changed, 77 insertions(+), 118 deletions(-) delete mode 100644 src/SampleBatch.Service/MassTransitConsoleHostedService.cs diff --git a/src/SampleBatch.Api/Program.cs b/src/SampleBatch.Api/Program.cs index 319ee65..50d2dfa 100644 --- a/src/SampleBatch.Api/Program.cs +++ b/src/SampleBatch.Api/Program.cs @@ -1,15 +1,10 @@ -using System; -using System.Collections.Generic; -using System.IO; -using System.Linq; -using System.Threading.Tasks; -using Microsoft.AspNetCore; -using Microsoft.AspNetCore.Hosting; -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.Logging; - -namespace SampleBatch.Api +namespace SampleBatch.Api { + using System.Threading.Tasks; + using Microsoft.AspNetCore; + using Microsoft.AspNetCore.Hosting; + + public class Program { public static async Task Main(string[] args) @@ -17,8 +12,10 @@ public static async Task Main(string[] args) await CreateWebHostBuilder(args).Build().RunAsync(); } - public static IWebHostBuilder CreateWebHostBuilder(string[] args) => - WebHost.CreateDefaultBuilder(args) + public static IWebHostBuilder CreateWebHostBuilder(string[] args) + { + return WebHost.CreateDefaultBuilder(args) .UseStartup(); + } } -} +} \ No newline at end of file diff --git a/src/SampleBatch.Api/Startup.cs b/src/SampleBatch.Api/Startup.cs index def7ce3..fc3385b 100644 --- a/src/SampleBatch.Api/Startup.cs +++ b/src/SampleBatch.Api/Startup.cs @@ -60,13 +60,9 @@ public void ConfigureServices(IServiceCollection services) }); } else - { throw new ApplicationException("Invalid Bus configuration. Couldn't find Azure or RabbitMq config"); - } }); - services.AddMassTransitHostedService(); - services.AddOpenApiDocument(cfg => cfg.PostProcess = d => d.Info.Title = "Sample-Batch"); services.AddStackExchangeRedisCache(options => @@ -86,7 +82,7 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IHostApp app.UseOpenApi(); // serve OpenAPI/Swagger documents app.UseSwaggerUi3(); // serve Swagger UI - app.UseHealthChecks("/health", new HealthCheckOptions {Predicate = check => check.Tags.Contains("ready")}); + app.UseHealthChecks("/health", new HealthCheckOptions { Predicate = check => check.Tags.Contains("ready") }); app.UseEndpoints(endpoints => { @@ -96,7 +92,7 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IHostApp lifetime.ApplicationStarted.Register(() => { var currentTimeUtc = DateTime.UtcNow.ToString(CultureInfo.InvariantCulture); - byte[] encodedCurrentTimeUtc = Encoding.UTF8.GetBytes(currentTimeUtc); + var encodedCurrentTimeUtc = Encoding.UTF8.GetBytes(currentTimeUtc); var options = new DistributedCacheEntryOptions() .SetSlidingExpiration(TimeSpan.FromSeconds(60)); cache.Set("cachedTimeUTC", encodedCurrentTimeUtc, options); diff --git a/src/SampleBatch.Components/Activities/SuspendOrder/SuspendOrderActivity.cs b/src/SampleBatch.Components/Activities/SuspendOrder/SuspendOrderActivity.cs index 49980cc..0351a95 100644 --- a/src/SampleBatch.Components/Activities/SuspendOrder/SuspendOrderActivity.cs +++ b/src/SampleBatch.Components/Activities/SuspendOrder/SuspendOrderActivity.cs @@ -9,7 +9,7 @@ public class SuspendOrderActivity : IExecuteActivity { - private readonly SampleBatchDbContext _dbContext; + readonly SampleBatchDbContext _dbContext; readonly ILogger _logger; public SuspendOrderActivity(SampleBatchDbContext dbContext, ILoggerFactory loggerFactory) diff --git a/src/SampleBatch.Components/Consumers/ProcessBatchJobConsumer.cs b/src/SampleBatch.Components/Consumers/ProcessBatchJobConsumer.cs index e0ba757..f5f0921 100644 --- a/src/SampleBatch.Components/Consumers/ProcessBatchJobConsumer.cs +++ b/src/SampleBatch.Components/Consumers/ProcessBatchJobConsumer.cs @@ -5,7 +5,6 @@ using Contracts; using Contracts.Enums; using MassTransit; - using MassTransit.Courier; using MassTransit.Courier.Contracts; using Microsoft.Extensions.Logging; @@ -55,7 +54,7 @@ await builder.AddSubscription( builder.AddActivity( "SuspendOrder", new Uri("queue:suspend-order_execute"), - new {context.Message.OrderId}); + new { context.Message.OrderId }); await builder.AddSubscription( context.SourceAddress, diff --git a/src/SampleBatch.Components/SampleBatchDbContext.cs b/src/SampleBatch.Components/SampleBatchDbContext.cs index 072a77a..3a48cda 100644 --- a/src/SampleBatch.Components/SampleBatchDbContext.cs +++ b/src/SampleBatch.Components/SampleBatchDbContext.cs @@ -1,11 +1,9 @@ -using Microsoft.EntityFrameworkCore; -using SampleBatch.Components.StateMachines; -using System; -using System.Collections.Generic; -using System.Text; - -namespace SampleBatch.Components +namespace SampleBatch.Components { + using Microsoft.EntityFrameworkCore; + using StateMachines; + + public class SampleBatchDbContext : DbContext { public SampleBatchDbContext(DbContextOptions options) @@ -13,13 +11,13 @@ public SampleBatchDbContext(DbContextOptions options) { } + public DbSet BatchStates { get; set; } + public DbSet JobStates { get; set; } + protected override void OnModelCreating(ModelBuilder modelBuilder) { modelBuilder.ApplyConfiguration(new BatchStateEntityConfiguration()); modelBuilder.ApplyConfiguration(new JobStateEntityConfiguration()); } - - public DbSet BatchStates { get; set; } - public DbSet JobStates { get; set; } } -} +} \ No newline at end of file diff --git a/src/SampleBatch.Components/StateMachines/BatchJobStateMachine.cs b/src/SampleBatch.Components/StateMachines/BatchJobStateMachine.cs index 168a4c3..bc8ff31 100644 --- a/src/SampleBatch.Components/StateMachines/BatchJobStateMachine.cs +++ b/src/SampleBatch.Components/StateMachines/BatchJobStateMachine.cs @@ -18,36 +18,36 @@ public BatchJobStateMachine() Initially( When(BatchJobReceived) - .Then(context => Touch(context.Instance, context.Data.Timestamp)) - .Then(context => SetReceiveTimestamp(context.Instance, context.Data.Timestamp)) + .Then(context => Touch(context.Saga, context.Message.Timestamp)) + .Then(context => SetReceiveTimestamp(context.Saga, context.Message.Timestamp)) .Then(Initialize) .Send(context => context.Init(new { - BatchJobId = context.Instance.CorrelationId, - Timestamp = DateTime.UtcNow, - context.Instance.BatchId, - context.Instance.OrderId, - context.Instance.Action + BatchJobId = context.Saga.CorrelationId, + InVar.Timestamp, + context.Saga.BatchId, + context.Saga.OrderId, + context.Saga.Action })) .TransitionTo(Received)); During(Received, When(BatchJobCompleted) - .Then(context => Touch(context.Instance, context.Data.Timestamp)) + .Then(context => Touch(context.Saga, context.Message.Timestamp)) .PublishAsync(context => context.Init(new { - BatchJobId = context.Instance.CorrelationId, - context.Instance.BatchId, + BatchJobId = context.Saga.CorrelationId, + context.Saga.BatchId, InVar.Timestamp })) .TransitionTo(Completed), When(BatchJobFailed) - .Then(context => Touch(context.Instance, context.Data.Timestamp)) - .Then(context => context.Instance.ExceptionMessage = context.Data.ExceptionInfo.Message) + .Then(context => Touch(context.Saga, context.Message.Timestamp)) + .Then(context => context.Saga.ExceptionMessage = context.Message.ExceptionInfo.Message) .PublishAsync(context => context.Init(new { - BatchJobId = context.Instance.CorrelationId, - context.Instance.BatchId, + BatchJobId = context.Saga.CorrelationId, + context.Saga.BatchId, InVar.Timestamp })) .TransitionTo(Failed)); @@ -77,7 +77,7 @@ static void SetReceiveTimestamp(BatchJobState state, DateTime timestamp) static void Initialize(BehaviorContext context) { - InitializeInstance(context.Instance, context.Data); + InitializeInstance(context.Saga, context.Message); } static void InitializeInstance(BatchJobState instance, BatchJobReceived data) diff --git a/src/SampleBatch.Components/StateMachines/BatchStateMachine.cs b/src/SampleBatch.Components/StateMachines/BatchStateMachine.cs index 680b6ff..5f942c1 100644 --- a/src/SampleBatch.Components/StateMachines/BatchStateMachine.cs +++ b/src/SampleBatch.Components/StateMachines/BatchStateMachine.cs @@ -35,19 +35,19 @@ public BatchStateMachine() Initially( When(BatchReceived) - .Then(context => Touch(context.Instance, context.Data.Timestamp)) - .Then(context => SetReceiveTimestamp(context.Instance, context.Data.Timestamp)) + .Then(context => Touch(context.Saga, context.Message.Timestamp)) + .Then(context => SetReceiveTimestamp(context.Saga, context.Message.Timestamp)) .Then(Initialize) - .IfElse(context => context.Data.DelayInSeconds.HasValue, + .IfElse(context => context.Message.DelayInSeconds.HasValue, thenBinder => thenBinder - .Schedule(StartBatch, context => context.Init(new {BatchId = context.Instance.CorrelationId}), - context => TimeSpan.FromSeconds(context.Data.DelayInSeconds.Value)) + .Schedule(StartBatch, context => context.Init(new { BatchId = context.Saga.CorrelationId }), + context => TimeSpan.FromSeconds(context.Message.DelayInSeconds.Value)) .TransitionTo(Received), elseBinder => elseBinder .ThenAsync(DispatchJobs) .TransitionTo(Started)), When(CancelBatch) - .Then(context => Touch(context.Instance, context.Data.Timestamp)) + .Then(context => Touch(context.Saga, context.Message.Timestamp)) .TransitionTo(Finished)); During(Received, @@ -55,29 +55,29 @@ public BatchStateMachine() .ThenAsync(DispatchJobs) .TransitionTo(Started), When(CancelBatch) - .Then(context => Touch(context.Instance, context.Data.Timestamp)) + .Then(context => Touch(context.Saga, context.Message.Timestamp)) .Unschedule(StartBatch) .TransitionTo(Finished)); During(Started, When(BatchJobDone) - .Then(context => Touch(context.Instance, context.Data.Timestamp)) + .Then(context => Touch(context.Saga, context.Message.Timestamp)) .Then(FinalizeJob) - .IfElse(context => context.Instance.UnprocessedOrderIds.Count == 0 && context.Instance.ProcessingOrderIds.Count == 0, + .IfElse(context => context.Saga.UnprocessedOrderIds.Count == 0 && context.Saga.ProcessingOrderIds.Count == 0, binder => binder .TransitionTo(Finished), binder => binder .ThenAsync(DispatchJobs)), When(CancelBatch) - .Then(context => Touch(context.Instance, context.Data.Timestamp)) + .Then(context => Touch(context.Saga, context.Message.Timestamp)) .TransitionTo(Cancelling)); // We continue receiving Job Done events, but don't Dispatch any new jobs During(Cancelling, When(BatchJobDone) - .Then(context => Touch(context.Instance, context.Data.Timestamp)) + .Then(context => Touch(context.Saga, context.Message.Timestamp)) .Then(FinalizeJob) - .If(context => context.Instance.ProcessingOrderIds.Count == 0, + .If(context => context.Saga.ProcessingOrderIds.Count == 0, binder => binder.TransitionTo(Finished))); During(Finished, Ignore(CancelBatch)); @@ -86,15 +86,15 @@ public BatchStateMachine() When(StateRequested) .RespondAsync(async x => await x.Init(new { - BatchId = x.Instance.CorrelationId, + BatchId = x.Saga.CorrelationId, InVar.Timestamp, - ProcessingJobCount = x.Instance.ProcessingOrderIds.Count, - UnprocessedJobCount = x.Instance.UnprocessedOrderIds.Count, - State = (await this.GetState(x)).Name + ProcessingJobCount = x.Saga.ProcessingOrderIds.Count, + UnprocessedJobCount = x.Saga.UnprocessedOrderIds.Count, + State = this.GetState(x) })), When(BatchReceived) - .Then(context => Touch(context.Instance, context.Data.Timestamp)) - .Then(context => SetReceiveTimestamp(context.Instance, context.Data.Timestamp)) + .Then(context => Touch(context.Saga, context.Message.Timestamp)) + .Then(context => SetReceiveTimestamp(context.Saga, context.Message.Timestamp)) .Then(Initialize)); } @@ -126,7 +126,7 @@ static void SetReceiveTimestamp(BatchState state, DateTime timestamp) static void Initialize(BehaviorContext context) { - InitializeInstance(context.Instance, context.Data); + InitializeInstance(context.Saga, context.Message); } static void InitializeInstance(BatchState instance, BatchReceived data) @@ -141,31 +141,31 @@ static async Task DispatchJobs(BehaviorContext context) { var jobsToSend = new List(); - while (context.Instance.UnprocessedOrderIds.Any() - && context.Instance.ProcessingOrderIds.Count < context.Instance.ActiveThreshold) + while (context.Saga.UnprocessedOrderIds.Any() + && context.Saga.ProcessingOrderIds.Count < context.Saga.ActiveThreshold) jobsToSend.Add(InitiateJob(context)); await Task.WhenAll(jobsToSend); } - static Task InitiateJob(BehaviorContext context) + static Task InitiateJob(SagaConsumeContext context) { - var orderId = context.Instance.UnprocessedOrderIds.Pop(); + var orderId = context.Saga.UnprocessedOrderIds.Pop(); var batchJobId = NewId.NextGuid(); - context.Instance.ProcessingOrderIds.Add(batchJobId, orderId); - return context.GetPayload().Publish(new + context.Saga.ProcessingOrderIds.Add(batchJobId, orderId); + return context.Publish(new { BatchJobId = batchJobId, InVar.Timestamp, - BatchId = context.Instance.CorrelationId, + BatchId = context.Saga.CorrelationId, OrderId = orderId, - context.Instance.Action + context.Saga.Action }); } static void FinalizeJob(BehaviorContext context) { - context.Instance.ProcessingOrderIds.Remove(context.Data.BatchJobId); + context.Saga.ProcessingOrderIds.Remove(context.Message.BatchJobId); } } diff --git a/src/SampleBatch.Service/EfDbCreatedHostedService.cs b/src/SampleBatch.Service/EfDbCreatedHostedService.cs index 39cd22d..b3ec8e2 100644 --- a/src/SampleBatch.Service/EfDbCreatedHostedService.cs +++ b/src/SampleBatch.Service/EfDbCreatedHostedService.cs @@ -1,13 +1,13 @@ -using MassTransit; -using Microsoft.EntityFrameworkCore; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; -using System; -using System.Threading; -using System.Threading.Tasks; - -namespace SampleBatch.Service +namespace SampleBatch.Service { + using System; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.EntityFrameworkCore; + using Microsoft.Extensions.DependencyInjection; + using Microsoft.Extensions.Hosting; + + public class EfDbCreatedHostedService : IHostedService { @@ -35,4 +35,4 @@ public Task StopAsync(CancellationToken cancellationToken) return Task.CompletedTask; } } -} +} \ No newline at end of file diff --git a/src/SampleBatch.Service/MassTransitConsoleHostedService.cs b/src/SampleBatch.Service/MassTransitConsoleHostedService.cs deleted file mode 100644 index 7036239..0000000 --- a/src/SampleBatch.Service/MassTransitConsoleHostedService.cs +++ /dev/null @@ -1,29 +0,0 @@ -namespace SampleBatch.Service -{ - using System.Threading; - using System.Threading.Tasks; - using MassTransit; - using Microsoft.Extensions.Hosting; - - - public class MassTransitConsoleHostedService : - IHostedService - { - readonly IBusControl _bus; - - public MassTransitConsoleHostedService(IBusControl bus) - { - _bus = bus; - } - - public async Task StartAsync(CancellationToken cancellationToken) - { - await _bus.StartAsync(cancellationToken).ConfigureAwait(false); - } - - public Task StopAsync(CancellationToken cancellationToken) - { - return _bus.StopAsync(cancellationToken); - } - } -} \ No newline at end of file diff --git a/src/SampleBatch.Service/Program.cs b/src/SampleBatch.Service/Program.cs index 0a64f91..e71abd7 100644 --- a/src/SampleBatch.Service/Program.cs +++ b/src/SampleBatch.Service/Program.cs @@ -115,8 +115,6 @@ static async Task Main(string[] args) services.AddDbContext(x => x.UseSqlServer(hostContext.Configuration.GetConnectionString("sample-batch"))); - services.AddHostedService(); - // So we don't need to use ef migrations for this sample. // Likely if you are going to deploy to a production environment, you want a better DB deploy strategy. services.AddHostedService();