Skip to content

Commit

Permalink
Previous commit is the minimal change required, this commit includes …
Browse files Browse the repository at this point in the history
…all the cleanup of no longer required stuff.
  • Loading branch information
phatboyg committed Apr 20, 2022
1 parent 07127a4 commit 9886c9b
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 118 deletions.
25 changes: 11 additions & 14 deletions src/SampleBatch.Api/Program.cs
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;

namespace SampleBatch.Api
namespace SampleBatch.Api
{
using System.Threading.Tasks;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;


public class Program
{
public static async Task Main(string[] args)
{
await CreateWebHostBuilder(args).Build().RunAsync();
}

public static IWebHostBuilder CreateWebHostBuilder(string[] args) =>
WebHost.CreateDefaultBuilder(args)
public static IWebHostBuilder CreateWebHostBuilder(string[] args)
{
return WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>();
}
}
}
}
8 changes: 2 additions & 6 deletions src/SampleBatch.Api/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,9 @@ public void ConfigureServices(IServiceCollection services)
});
}
else
{
throw new ApplicationException("Invalid Bus configuration. Couldn't find Azure or RabbitMq config");
}
});

services.AddMassTransitHostedService();

services.AddOpenApiDocument(cfg => cfg.PostProcess = d => d.Info.Title = "Sample-Batch");

services.AddStackExchangeRedisCache(options =>
Expand All @@ -86,7 +82,7 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IHostApp
app.UseOpenApi(); // serve OpenAPI/Swagger documents
app.UseSwaggerUi3(); // serve Swagger UI

app.UseHealthChecks("/health", new HealthCheckOptions {Predicate = check => check.Tags.Contains("ready")});
app.UseHealthChecks("/health", new HealthCheckOptions { Predicate = check => check.Tags.Contains("ready") });

app.UseEndpoints(endpoints =>
{
Expand All @@ -96,7 +92,7 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IHostApp
lifetime.ApplicationStarted.Register(() =>
{
var currentTimeUtc = DateTime.UtcNow.ToString(CultureInfo.InvariantCulture);
byte[] encodedCurrentTimeUtc = Encoding.UTF8.GetBytes(currentTimeUtc);
var encodedCurrentTimeUtc = Encoding.UTF8.GetBytes(currentTimeUtc);
var options = new DistributedCacheEntryOptions()
.SetSlidingExpiration(TimeSpan.FromSeconds(60));
cache.Set("cachedTimeUTC", encodedCurrentTimeUtc, options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
public class SuspendOrderActivity :
IExecuteActivity<SuspendOrderArguments>
{
private readonly SampleBatchDbContext _dbContext;
readonly SampleBatchDbContext _dbContext;
readonly ILogger _logger;

public SuspendOrderActivity(SampleBatchDbContext dbContext, ILoggerFactory loggerFactory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using Contracts;
using Contracts.Enums;
using MassTransit;
using MassTransit.Courier;
using MassTransit.Courier.Contracts;
using Microsoft.Extensions.Logging;

Expand Down Expand Up @@ -55,7 +54,7 @@ await builder.AddSubscription(
builder.AddActivity(
"SuspendOrder",
new Uri("queue:suspend-order_execute"),
new {context.Message.OrderId});
new { context.Message.OrderId });

await builder.AddSubscription(
context.SourceAddress,
Expand Down
20 changes: 9 additions & 11 deletions src/SampleBatch.Components/SampleBatchDbContext.cs
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
using Microsoft.EntityFrameworkCore;
using SampleBatch.Components.StateMachines;
using System;
using System.Collections.Generic;
using System.Text;

namespace SampleBatch.Components
namespace SampleBatch.Components
{
using Microsoft.EntityFrameworkCore;
using StateMachines;


public class SampleBatchDbContext : DbContext
{
public SampleBatchDbContext(DbContextOptions options)
: base(options)
{
}

public DbSet<BatchState> BatchStates { get; set; }
public DbSet<BatchJobState> JobStates { get; set; }

protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.ApplyConfiguration(new BatchStateEntityConfiguration());
modelBuilder.ApplyConfiguration(new JobStateEntityConfiguration());
}

public DbSet<BatchState> BatchStates { get; set; }
public DbSet<BatchJobState> JobStates { get; set; }
}
}
}
30 changes: 15 additions & 15 deletions src/SampleBatch.Components/StateMachines/BatchJobStateMachine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,36 @@ public BatchJobStateMachine()

Initially(
When(BatchJobReceived)
.Then(context => Touch(context.Instance, context.Data.Timestamp))
.Then(context => SetReceiveTimestamp(context.Instance, context.Data.Timestamp))
.Then(context => Touch(context.Saga, context.Message.Timestamp))
.Then(context => SetReceiveTimestamp(context.Saga, context.Message.Timestamp))
.Then(Initialize)
.Send(context => context.Init<ProcessBatchJob>(new
{
BatchJobId = context.Instance.CorrelationId,
Timestamp = DateTime.UtcNow,
context.Instance.BatchId,
context.Instance.OrderId,
context.Instance.Action
BatchJobId = context.Saga.CorrelationId,
InVar.Timestamp,
context.Saga.BatchId,
context.Saga.OrderId,
context.Saga.Action
}))
.TransitionTo(Received));

During(Received,
When(BatchJobCompleted)
.Then(context => Touch(context.Instance, context.Data.Timestamp))
.Then(context => Touch(context.Saga, context.Message.Timestamp))
.PublishAsync(context => context.Init<BatchJobDone>(new
{
BatchJobId = context.Instance.CorrelationId,
context.Instance.BatchId,
BatchJobId = context.Saga.CorrelationId,
context.Saga.BatchId,
InVar.Timestamp
}))
.TransitionTo(Completed),
When(BatchJobFailed)
.Then(context => Touch(context.Instance, context.Data.Timestamp))
.Then(context => context.Instance.ExceptionMessage = context.Data.ExceptionInfo.Message)
.Then(context => Touch(context.Saga, context.Message.Timestamp))
.Then(context => context.Saga.ExceptionMessage = context.Message.ExceptionInfo.Message)
.PublishAsync(context => context.Init<BatchJobDone>(new
{
BatchJobId = context.Instance.CorrelationId,
context.Instance.BatchId,
BatchJobId = context.Saga.CorrelationId,
context.Saga.BatchId,
InVar.Timestamp
}))
.TransitionTo(Failed));
Expand Down Expand Up @@ -77,7 +77,7 @@ static void SetReceiveTimestamp(BatchJobState state, DateTime timestamp)

static void Initialize(BehaviorContext<BatchJobState, BatchJobReceived> context)
{
InitializeInstance(context.Instance, context.Data);
InitializeInstance(context.Saga, context.Message);
}

static void InitializeInstance(BatchJobState instance, BatchJobReceived data)
Expand Down
56 changes: 28 additions & 28 deletions src/SampleBatch.Components/StateMachines/BatchStateMachine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,49 +35,49 @@ public BatchStateMachine()

Initially(
When(BatchReceived)
.Then(context => Touch(context.Instance, context.Data.Timestamp))
.Then(context => SetReceiveTimestamp(context.Instance, context.Data.Timestamp))
.Then(context => Touch(context.Saga, context.Message.Timestamp))
.Then(context => SetReceiveTimestamp(context.Saga, context.Message.Timestamp))
.Then(Initialize)
.IfElse(context => context.Data.DelayInSeconds.HasValue,
.IfElse(context => context.Message.DelayInSeconds.HasValue,
thenBinder => thenBinder
.Schedule(StartBatch, context => context.Init<StartBatch>(new {BatchId = context.Instance.CorrelationId}),
context => TimeSpan.FromSeconds(context.Data.DelayInSeconds.Value))
.Schedule(StartBatch, context => context.Init<StartBatch>(new { BatchId = context.Saga.CorrelationId }),
context => TimeSpan.FromSeconds(context.Message.DelayInSeconds.Value))
.TransitionTo(Received),
elseBinder => elseBinder
.ThenAsync(DispatchJobs)
.TransitionTo(Started)),
When(CancelBatch)
.Then(context => Touch(context.Instance, context.Data.Timestamp))
.Then(context => Touch(context.Saga, context.Message.Timestamp))
.TransitionTo(Finished));

During(Received,
When(StartBatch.Received)
.ThenAsync(DispatchJobs)
.TransitionTo(Started),
When(CancelBatch)
.Then(context => Touch(context.Instance, context.Data.Timestamp))
.Then(context => Touch(context.Saga, context.Message.Timestamp))
.Unschedule(StartBatch)
.TransitionTo(Finished));

During(Started,
When(BatchJobDone)
.Then(context => Touch(context.Instance, context.Data.Timestamp))
.Then(context => Touch(context.Saga, context.Message.Timestamp))
.Then(FinalizeJob)
.IfElse(context => context.Instance.UnprocessedOrderIds.Count == 0 && context.Instance.ProcessingOrderIds.Count == 0,
.IfElse(context => context.Saga.UnprocessedOrderIds.Count == 0 && context.Saga.ProcessingOrderIds.Count == 0,
binder => binder
.TransitionTo(Finished),
binder => binder
.ThenAsync(DispatchJobs)),
When(CancelBatch)
.Then(context => Touch(context.Instance, context.Data.Timestamp))
.Then(context => Touch(context.Saga, context.Message.Timestamp))
.TransitionTo(Cancelling));

// We continue receiving Job Done events, but don't Dispatch any new jobs
During(Cancelling,
When(BatchJobDone)
.Then(context => Touch(context.Instance, context.Data.Timestamp))
.Then(context => Touch(context.Saga, context.Message.Timestamp))
.Then(FinalizeJob)
.If(context => context.Instance.ProcessingOrderIds.Count == 0,
.If(context => context.Saga.ProcessingOrderIds.Count == 0,
binder => binder.TransitionTo(Finished)));

During(Finished, Ignore(CancelBatch));
Expand All @@ -86,15 +86,15 @@ public BatchStateMachine()
When(StateRequested)
.RespondAsync(async x => await x.Init<BatchStatus>(new
{
BatchId = x.Instance.CorrelationId,
BatchId = x.Saga.CorrelationId,
InVar.Timestamp,
ProcessingJobCount = x.Instance.ProcessingOrderIds.Count,
UnprocessedJobCount = x.Instance.UnprocessedOrderIds.Count,
State = (await this.GetState(x)).Name
ProcessingJobCount = x.Saga.ProcessingOrderIds.Count,
UnprocessedJobCount = x.Saga.UnprocessedOrderIds.Count,
State = this.GetState(x)
})),
When(BatchReceived)
.Then(context => Touch(context.Instance, context.Data.Timestamp))
.Then(context => SetReceiveTimestamp(context.Instance, context.Data.Timestamp))
.Then(context => Touch(context.Saga, context.Message.Timestamp))
.Then(context => SetReceiveTimestamp(context.Saga, context.Message.Timestamp))
.Then(Initialize));
}

Expand Down Expand Up @@ -126,7 +126,7 @@ static void SetReceiveTimestamp(BatchState state, DateTime timestamp)

static void Initialize(BehaviorContext<BatchState, BatchReceived> context)
{
InitializeInstance(context.Instance, context.Data);
InitializeInstance(context.Saga, context.Message);
}

static void InitializeInstance(BatchState instance, BatchReceived data)
Expand All @@ -141,31 +141,31 @@ static async Task DispatchJobs(BehaviorContext<BatchState> context)
{
var jobsToSend = new List<Task>();

while (context.Instance.UnprocessedOrderIds.Any()
&& context.Instance.ProcessingOrderIds.Count < context.Instance.ActiveThreshold)
while (context.Saga.UnprocessedOrderIds.Any()
&& context.Saga.ProcessingOrderIds.Count < context.Saga.ActiveThreshold)
jobsToSend.Add(InitiateJob(context));

await Task.WhenAll(jobsToSend);
}

static Task InitiateJob(BehaviorContext<BatchState> context)
static Task InitiateJob(SagaConsumeContext<BatchState> context)
{
var orderId = context.Instance.UnprocessedOrderIds.Pop();
var orderId = context.Saga.UnprocessedOrderIds.Pop();
var batchJobId = NewId.NextGuid();
context.Instance.ProcessingOrderIds.Add(batchJobId, orderId);
return context.GetPayload<ConsumeContext>().Publish<BatchJobReceived>(new
context.Saga.ProcessingOrderIds.Add(batchJobId, orderId);
return context.Publish<BatchJobReceived>(new
{
BatchJobId = batchJobId,
InVar.Timestamp,
BatchId = context.Instance.CorrelationId,
BatchId = context.Saga.CorrelationId,
OrderId = orderId,
context.Instance.Action
context.Saga.Action
});
}

static void FinalizeJob(BehaviorContext<BatchState, BatchJobDone> context)
{
context.Instance.ProcessingOrderIds.Remove(context.Data.BatchJobId);
context.Saga.ProcessingOrderIds.Remove(context.Message.BatchJobId);
}
}

Expand Down
20 changes: 10 additions & 10 deletions src/SampleBatch.Service/EfDbCreatedHostedService.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
using MassTransit;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace SampleBatch.Service
namespace SampleBatch.Service
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;


public class EfDbCreatedHostedService :
IHostedService
{
Expand Down Expand Up @@ -35,4 +35,4 @@ public Task StopAsync(CancellationToken cancellationToken)
return Task.CompletedTask;
}
}
}
}
29 changes: 0 additions & 29 deletions src/SampleBatch.Service/MassTransitConsoleHostedService.cs

This file was deleted.

2 changes: 0 additions & 2 deletions src/SampleBatch.Service/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ static async Task Main(string[] args)

services.AddDbContext<SampleBatchDbContext>(x => x.UseSqlServer(hostContext.Configuration.GetConnectionString("sample-batch")));

services.AddHostedService<MassTransitConsoleHostedService>();

// So we don't need to use ef migrations for this sample.
// Likely if you are going to deploy to a production environment, you want a better DB deploy strategy.
services.AddHostedService<EfDbCreatedHostedService>();
Expand Down

0 comments on commit 9886c9b

Please sign in to comment.