diff --git a/src/SampleBatch.Api/Controllers/BatchJobsController.cs b/src/SampleBatch.Api/Controllers/BatchJobsController.cs index 43d2853..456724e 100644 --- a/src/SampleBatch.Api/Controllers/BatchJobsController.cs +++ b/src/SampleBatch.Api/Controllers/BatchJobsController.cs @@ -88,7 +88,7 @@ public async Task> Post(int jobCount = 100, int activeThresho { BatchId = id, InVar.Timestamp, - Action = BatchAction.CancelOrders, + Action = BatchActionEnum.CancelOrders, OrderIds = orderIds.ToArray(), ActiveThreshold = activeThreshold, DelayInSeconds = delayInSeconds diff --git a/src/SampleBatch.Components/Consumers/ProcessBatchJobConsumer.cs b/src/SampleBatch.Components/Consumers/ProcessBatchJobConsumer.cs index e0ba757..6bc94b5 100644 --- a/src/SampleBatch.Components/Consumers/ProcessBatchJobConsumer.cs +++ b/src/SampleBatch.Components/Consumers/ProcessBatchJobConsumer.cs @@ -1,9 +1,7 @@ namespace SampleBatch.Components.Consumers { - using System; using System.Threading.Tasks; using Contracts; - using Contracts.Enums; using MassTransit; using MassTransit.Courier; using MassTransit.Courier.Contracts; @@ -24,64 +22,21 @@ public async Task Consume(ConsumeContext context) { using (_log.BeginScope("ProcessBatchJob {BatchJobId}, {OrderId}", context.Message.BatchJobId, context.Message.OrderId)) { - var builder = new RoutingSlipBuilder(NewId.NextGuid()); - - switch (context.Message.Action) + var routingSlip = await context.Message.Action.SetupRoutingSlip(context, async builder => { - case BatchAction.CancelOrders: - builder.AddActivity( - "CancelOrder", - new Uri("queue:cancel-order_execute"), - new - { - context.Message.OrderId, - Reason = "Product discontinued" - }); - - await builder.AddSubscription( - context.SourceAddress, - RoutingSlipEvents.ActivityFaulted, - RoutingSlipEventContents.None, - "CancelOrder", - x => x.Send(new - { - context.Message.BatchJobId, - context.Message.BatchId, - context.Message.OrderId - })); - break; - - case BatchAction.SuspendOrders: - builder.AddActivity( - "SuspendOrder", - new Uri("queue:suspend-order_execute"), - new {context.Message.OrderId}); - - await builder.AddSubscription( - context.SourceAddress, - RoutingSlipEvents.ActivityFaulted, - RoutingSlipEventContents.None, - "SuspendOrder", - x => x.Send(new - { - context.Message.BatchJobId, - context.Message.BatchId, - context.Message.OrderId - })); - break; - } - - await builder.AddSubscription( - context.SourceAddress, - RoutingSlipEvents.Completed, - x => x.Send(new - { - context.Message.BatchJobId, - context.Message.BatchId - })); - - await context.Execute(builder.Build()); + await builder.AddSubscription( + context.SourceAddress, + RoutingSlipEvents.Completed, + x => x.Send(new + { + context.Message.BatchJobId, + context.Message.BatchId, + InVar.Timestamp + })); + }); + + await context.Execute(routingSlip); } } } -} \ No newline at end of file +} diff --git a/src/SampleBatch.Components/SampleBatchDbContext.cs b/src/SampleBatch.Components/SampleBatchDbContext.cs index c83dabe..ef1e497 100644 --- a/src/SampleBatch.Components/SampleBatchDbContext.cs +++ b/src/SampleBatch.Components/SampleBatchDbContext.cs @@ -1,8 +1,6 @@ using Microsoft.EntityFrameworkCore; using SampleBatch.Components.StateMachines; -using System; -using System.Collections.Generic; -using System.Text; + namespace SampleBatch.Components { diff --git a/src/SampleBatch.Components/StateMachines/BatchState.cs b/src/SampleBatch.Components/StateMachines/BatchState.cs index 36e3023..20fedce 100644 --- a/src/SampleBatch.Components/StateMachines/BatchState.cs +++ b/src/SampleBatch.Components/StateMachines/BatchState.cs @@ -19,12 +19,12 @@ public class BatchState : public DateTime? UpdateTimestamp { get; set; } - public BatchAction? Action { get; set; } + public BatchActionEnum Action { get; set; } - /// - /// The maximum amount of active Jobs allowed to be processing. Typically an amount larger than your Job Consumer can handle concurrently, to allow for some additional prefetch while the Batch Saga dispatches more - /// - public int? ActiveThreshold { get; set; } = 20; + /// + /// The maximum amount of active Jobs allowed to be processing. Typically an amount larger than your Job Consumer can handle concurrently, to allow for some additional prefetch while the Batch Saga dispatches more + /// + public int? ActiveThreshold { get; set; } = 20; public int? Total { get; set; } diff --git a/src/SampleBatch.Components/StateMachines/BatchStateEntityConfiguration.cs b/src/SampleBatch.Components/StateMachines/BatchStateEntityConfiguration.cs index 935827a..58d4881 100644 --- a/src/SampleBatch.Components/StateMachines/BatchStateEntityConfiguration.cs +++ b/src/SampleBatch.Components/StateMachines/BatchStateEntityConfiguration.cs @@ -2,11 +2,11 @@ { using System; using System.Collections.Generic; + using System.Linq; using Common; using Contracts.Enums; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Metadata.Builders; - using Microsoft.EntityFrameworkCore.Storage.ValueConversion; class BatchStateEntityConfiguration : @@ -22,8 +22,9 @@ public void Configure(EntityTypeBuilder builder) builder.Property(c => c.CurrentState).IsRequired(); - builder.Property(c => c.Action) - .HasConversion(new EnumToStringConverter()); + builder.Property(p => p.Action) + .HasConversion(v => v.Value, i => BatchActionEnum.List().FirstOrDefault(e => e.Value == i)); + builder.Property(c => c.UnprocessedOrderIds) .HasConversion(new JsonValueConverter>()) @@ -34,4 +35,4 @@ public void Configure(EntityTypeBuilder builder) .Metadata.SetValueComparer(new JsonValueComparer>()); } } -} \ No newline at end of file +} diff --git a/src/SampleBatch.Components/StateMachines/JobState.cs b/src/SampleBatch.Components/StateMachines/JobState.cs index 9b45659..a589024 100644 --- a/src/SampleBatch.Components/StateMachines/JobState.cs +++ b/src/SampleBatch.Components/StateMachines/JobState.cs @@ -20,7 +20,7 @@ public class JobState : public DateTime? UpdateTimestamp { get; set; } - public BatchAction Action { get; set; } + public BatchActionEnum Action { get; set; } public string ExceptionMessage { get; set; } diff --git a/src/SampleBatch.Components/StateMachines/JobStateEntityConfiguration.cs b/src/SampleBatch.Components/StateMachines/JobStateEntityConfiguration.cs index d6f6635..ddc0208 100644 --- a/src/SampleBatch.Components/StateMachines/JobStateEntityConfiguration.cs +++ b/src/SampleBatch.Components/StateMachines/JobStateEntityConfiguration.cs @@ -1,9 +1,9 @@ namespace SampleBatch.Components.StateMachines { + using System.Linq; using Contracts.Enums; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Metadata.Builders; - using Microsoft.EntityFrameworkCore.Storage.ValueConversion; class JobStateEntityConfiguration : @@ -19,8 +19,8 @@ public void Configure(EntityTypeBuilder builder) builder.Property(c => c.CurrentState).IsRequired(); - builder.Property(c => c.Action) - .HasConversion(new EnumToStringConverter()); + builder.Property(p => p.Action) + .HasConversion(v => v.Value, i => BatchActionEnum.List().FirstOrDefault(e => e.Value == i)); } } -} \ No newline at end of file +} diff --git a/src/SampleBatch.Contracts/BatchJobReceived.cs b/src/SampleBatch.Contracts/BatchJobReceived.cs index 941098c..3858ecc 100644 --- a/src/SampleBatch.Contracts/BatchJobReceived.cs +++ b/src/SampleBatch.Contracts/BatchJobReceived.cs @@ -10,6 +10,6 @@ public interface BatchJobReceived Guid BatchId { get; } Guid OrderId { get; } DateTime Timestamp { get; } - BatchAction Action { get; } + BatchActionEnum Action { get; } } } \ No newline at end of file diff --git a/src/SampleBatch.Contracts/BatchReceived.cs b/src/SampleBatch.Contracts/BatchReceived.cs index f908a7a..7b7f603 100644 --- a/src/SampleBatch.Contracts/BatchReceived.cs +++ b/src/SampleBatch.Contracts/BatchReceived.cs @@ -8,7 +8,7 @@ public interface BatchReceived { Guid BatchId { get; } DateTime Timestamp { get; } - BatchAction Action { get; } + BatchActionEnum Action { get; } Guid[] OrderIds { get; } int ActiveThreshold { get; } diff --git a/src/SampleBatch.Contracts/Enums/BatchAction.cs b/src/SampleBatch.Contracts/Enums/BatchAction.cs deleted file mode 100644 index 2b0a779..0000000 --- a/src/SampleBatch.Contracts/Enums/BatchAction.cs +++ /dev/null @@ -1,8 +0,0 @@ -namespace SampleBatch.Contracts.Enums -{ - public enum BatchAction - { - CancelOrders = 1, - SuspendOrders = 2 - } -} diff --git a/src/SampleBatch.Contracts/Enums/BatchActionEnum.cs b/src/SampleBatch.Contracts/Enums/BatchActionEnum.cs new file mode 100644 index 0000000..78b65ff --- /dev/null +++ b/src/SampleBatch.Contracts/Enums/BatchActionEnum.cs @@ -0,0 +1,50 @@ +namespace SampleBatch.Contracts.Enums +{ + using System; + using System.Collections.Generic; + using System.Threading.Tasks; + using Converter; + using Internal; + using MassTransit; + using MassTransit.Courier; + using MassTransit.Courier.Contracts; + using Newtonsoft.Json; + + + [JsonConverter(typeof(BatchActionEnumConverter))] + public abstract class BatchActionEnum + { + public static readonly BatchActionEnum CancelOrders = new CancelOrdersEnum(); + public static readonly BatchActionEnum SuspendOrders = new SuspendOrdersEnum(); + + public int Value { get; private set; } + public string Name { get; private set; } + + public static IEnumerable List() + { + yield return CancelOrders; + yield return SuspendOrders; + } + + protected BatchActionEnum(int value, string name) + { + Value = value; + Name = name; + } + + public async Task SetupRoutingSlip(ConsumeContext context, Func commonAction) + { + var builder = new RoutingSlipBuilder(NewId.NextGuid()); + + await SetupRoutingSlip(builder, context); + + await commonAction?.Invoke(builder); + + return builder.Build(); + + } + + protected abstract Task SetupRoutingSlip(RoutingSlipBuilder builder, ConsumeContext context); + + } +} diff --git a/src/SampleBatch.Contracts/Enums/Converter/BatchActionEnumConverter.cs b/src/SampleBatch.Contracts/Enums/Converter/BatchActionEnumConverter.cs new file mode 100644 index 0000000..ddc3b95 --- /dev/null +++ b/src/SampleBatch.Contracts/Enums/Converter/BatchActionEnumConverter.cs @@ -0,0 +1,53 @@ +namespace SampleBatch.Contracts.Enums.Converter +{ + using System; + using Internal; + using Newtonsoft.Json; + using Newtonsoft.Json.Linq; + using Newtonsoft.Json.Serialization; + + + public class BaseSpecifiedConcreteClassConverter : DefaultContractResolver + { + protected override JsonConverter ResolveContractConverter(Type objectType) + { + if (typeof(BatchActionEnum).IsAssignableFrom(objectType) && !objectType.IsAbstract) + return null; // pretend TableSortRuleConvert is not specified (thus avoiding a stack overflow) + return base.ResolveContractConverter(objectType); + } + } + + public class BatchActionEnumConverter : JsonConverter + { + static readonly JsonSerializerSettings SpecifiedSubclassConversion = new JsonSerializerSettings() { ContractResolver = new BaseSpecifiedConcreteClassConverter() }; + + public override bool CanConvert(Type objectType) + { + return (objectType == typeof(BatchActionEnum)); + } + + public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer) + { + JObject jo = JObject.Load(reader); + switch (jo["value"].Value()) + { + case 1: + return JsonConvert.DeserializeObject(jo.ToString(), SpecifiedSubclassConversion); + case 2: + return JsonConvert.DeserializeObject(jo.ToString(), SpecifiedSubclassConversion); + default: + throw new Exception(); + } + throw new NotImplementedException(); + } + + public override bool CanWrite { + get { return false; } + } + + public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer) + { + throw new NotImplementedException(); // won't be called because CanWrite returns false + } + } +} \ No newline at end of file diff --git a/src/SampleBatch.Contracts/Enums/Internal/CancelOrdersEnum.cs b/src/SampleBatch.Contracts/Enums/Internal/CancelOrdersEnum.cs new file mode 100644 index 0000000..fdc9597 --- /dev/null +++ b/src/SampleBatch.Contracts/Enums/Internal/CancelOrdersEnum.cs @@ -0,0 +1,41 @@ +namespace SampleBatch.Contracts.Enums.Internal +{ + using System; + using System.Threading.Tasks; + using MassTransit; + using MassTransit.Courier; + using MassTransit.Courier.Contracts; + + + class CancelOrdersEnum : BatchActionEnum + { + public CancelOrdersEnum() + : base(1, "Cancel Orders") + { + } + + protected override async Task SetupRoutingSlip( RoutingSlipBuilder builder, ConsumeContext context) + { + builder.AddActivity( + "CancelOrder", + new Uri("queue:cancel-order_execute"), + new + { + context.Message.OrderId, + Reason = "Product discontinued" + }); + + await builder.AddSubscription( + context.SourceAddress, + RoutingSlipEvents.ActivityFaulted, + RoutingSlipEventContents.None, + "CancelOrder", + x => x.Send(new + { + context.Message.BatchJobId, + context.Message.BatchId, + context.Message.OrderId + })); + } + } +} diff --git a/src/SampleBatch.Contracts/Enums/Internal/SuspendOrdersEnum.cs b/src/SampleBatch.Contracts/Enums/Internal/SuspendOrdersEnum.cs new file mode 100644 index 0000000..42308c5 --- /dev/null +++ b/src/SampleBatch.Contracts/Enums/Internal/SuspendOrdersEnum.cs @@ -0,0 +1,37 @@ +namespace SampleBatch.Contracts.Enums.Internal +{ + using System; + using System.Threading.Tasks; + using MassTransit; + using MassTransit.Courier; + using MassTransit.Courier.Contracts; + + + class SuspendOrdersEnum : BatchActionEnum + { + public SuspendOrdersEnum() + : base(2, "Suspend Orders") + { + } + + protected override async Task SetupRoutingSlip(RoutingSlipBuilder builder, ConsumeContext context) + { + builder.AddActivity( + "SuspendOrder", + new Uri("queue:suspend-order_execute"), + new { context.Message.OrderId }); + + await builder.AddSubscription( + context.SourceAddress, + RoutingSlipEvents.ActivityFaulted, + RoutingSlipEventContents.None, + "SuspendOrder", + x => x.Send(new + { + context.Message.BatchJobId, + context.Message.BatchId, + context.Message.OrderId + })); + } + } +} diff --git a/src/SampleBatch.Contracts/ProcessBatchJob.cs b/src/SampleBatch.Contracts/ProcessBatchJob.cs index 0a5e38d..5132e8d 100644 --- a/src/SampleBatch.Contracts/ProcessBatchJob.cs +++ b/src/SampleBatch.Contracts/ProcessBatchJob.cs @@ -10,6 +10,6 @@ public interface ProcessBatchJob Guid BatchId { get; } DateTime Timestamp { get; } Guid OrderId { get; } - BatchAction Action { get; } + BatchActionEnum Action { get; } } } \ No newline at end of file diff --git a/src/SampleBatch.Contracts/SubmitBatch.cs b/src/SampleBatch.Contracts/SubmitBatch.cs index 82e8b63..858a1c8 100644 --- a/src/SampleBatch.Contracts/SubmitBatch.cs +++ b/src/SampleBatch.Contracts/SubmitBatch.cs @@ -10,7 +10,7 @@ public interface SubmitBatch DateTime Timestamp { get; } - BatchAction Action { get; } + BatchActionEnum Action { get; } Guid[] OrderIds { get; } diff --git a/src/SampleBatch.Tests/Integration/BatchStateMachineTests.cs b/src/SampleBatch.Tests/Integration/BatchStateMachineTests.cs index f23db78..d85baba 100644 --- a/src/SampleBatch.Tests/Integration/BatchStateMachineTests.cs +++ b/src/SampleBatch.Tests/Integration/BatchStateMachineTests.cs @@ -81,7 +81,7 @@ public async Task should_complete_successfully() { BatchId = NewId.NextGuid(), Timestamp = DateTime.UtcNow, - BatchAction = BatchAction.CancelOrders, + BatchAction = BatchActionEnum.CancelOrders, ActiveThreshold = 5, OrderIds = new Guid[] { Guid.NewGuid(), Guid.NewGuid() } }); diff --git a/src/SampleBatch.Tests/Unit/BatchStateMachineTests.cs b/src/SampleBatch.Tests/Unit/BatchStateMachineTests.cs index b525dc7..b71ccd6 100644 --- a/src/SampleBatch.Tests/Unit/BatchStateMachineTests.cs +++ b/src/SampleBatch.Tests/Unit/BatchStateMachineTests.cs @@ -59,7 +59,7 @@ public async Task should_start() { BatchId = NewId.NextGuid(), Timestamp = DateTime.UtcNow, - BatchAction = BatchAction.CancelOrders, + BatchAction = BatchActionEnum.CancelOrders, ActiveThreshold = 5, OrderIds = new Guid[] { Guid.NewGuid(), Guid.NewGuid() } }); @@ -87,7 +87,7 @@ public async Task should_receive_and_wait() { BatchId = NewId.NextGuid(), Timestamp = DateTime.UtcNow, - BatchAction = BatchAction.CancelOrders, + BatchAction = BatchActionEnum.CancelOrders, ActiveThreshold = 5, OrderIds = new Guid[] { Guid.NewGuid(), Guid.NewGuid() }, DelayInSeconds = 60