Skip to content

Commit

Permalink
Removed switch-case from ProcessBatchJobConsumer
Browse files Browse the repository at this point in the history
  • Loading branch information
shahabganji committed Jul 20, 2020
1 parent d01be29 commit 2e692b1
Show file tree
Hide file tree
Showing 18 changed files with 219 additions and 92 deletions.
2 changes: 1 addition & 1 deletion src/SampleBatch.Api/Controllers/BatchJobsController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public async Task<ActionResult<Guid>> Post(int jobCount = 100, int activeThresho
{
BatchId = id,
InVar.Timestamp,
Action = BatchAction.CancelOrders,
Action = BatchActionEnum.CancelOrders,
OrderIds = orderIds.ToArray(),
ActiveThreshold = activeThreshold,
DelayInSeconds = delayInSeconds
Expand Down
73 changes: 14 additions & 59 deletions src/SampleBatch.Components/Consumers/ProcessBatchJobConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
namespace SampleBatch.Components.Consumers
{
using System;
using System.Threading.Tasks;
using Contracts;
using Contracts.Enums;
using MassTransit;
using MassTransit.Courier;
using MassTransit.Courier.Contracts;
Expand All @@ -24,64 +22,21 @@ public async Task Consume(ConsumeContext<ProcessBatchJob> context)
{
using (_log.BeginScope("ProcessBatchJob {BatchJobId}, {OrderId}", context.Message.BatchJobId, context.Message.OrderId))
{
var builder = new RoutingSlipBuilder(NewId.NextGuid());

switch (context.Message.Action)
var routingSlip = await context.Message.Action.SetupRoutingSlip(context, async builder =>
{
case BatchAction.CancelOrders:
builder.AddActivity(
"CancelOrder",
new Uri("queue:cancel-order_execute"),
new
{
context.Message.OrderId,
Reason = "Product discontinued"
});

await builder.AddSubscription(
context.SourceAddress,
RoutingSlipEvents.ActivityFaulted,
RoutingSlipEventContents.None,
"CancelOrder",
x => x.Send<BatchJobFailed>(new
{
context.Message.BatchJobId,
context.Message.BatchId,
context.Message.OrderId
}));
break;

case BatchAction.SuspendOrders:
builder.AddActivity(
"SuspendOrder",
new Uri("queue:suspend-order_execute"),
new {context.Message.OrderId});

await builder.AddSubscription(
context.SourceAddress,
RoutingSlipEvents.ActivityFaulted,
RoutingSlipEventContents.None,
"SuspendOrder",
x => x.Send<BatchJobFailed>(new
{
context.Message.BatchJobId,
context.Message.BatchId,
context.Message.OrderId
}));
break;
}

await builder.AddSubscription(
context.SourceAddress,
RoutingSlipEvents.Completed,
x => x.Send<BatchJobCompleted>(new
{
context.Message.BatchJobId,
context.Message.BatchId
}));

await context.Execute(builder.Build());
await builder.AddSubscription(
context.SourceAddress,
RoutingSlipEvents.Completed,
x => x.Send<BatchJobCompleted>(new
{
context.Message.BatchJobId,
context.Message.BatchId,
InVar.Timestamp
}));
});

await context.Execute(routingSlip);
}
}
}
}
}
4 changes: 1 addition & 3 deletions src/SampleBatch.Components/SampleBatchDbContext.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
using Microsoft.EntityFrameworkCore;
using SampleBatch.Components.StateMachines;
using System;
using System.Collections.Generic;
using System.Text;


namespace SampleBatch.Components
{
Expand Down
10 changes: 5 additions & 5 deletions src/SampleBatch.Components/StateMachines/BatchState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ public class BatchState :

public DateTime? UpdateTimestamp { get; set; }

public BatchAction? Action { get; set; }
public BatchActionEnum Action { get; set; }

/// <summary>
/// The maximum amount of active Jobs allowed to be processing. Typically an amount larger than your Job Consumer can handle concurrently, to allow for some additional prefetch while the Batch Saga dispatches more
/// </summary>
public int? ActiveThreshold { get; set; } = 20;
/// <summary>
/// The maximum amount of active Jobs allowed to be processing. Typically an amount larger than your Job Consumer can handle concurrently, to allow for some additional prefetch while the Batch Saga dispatches more
/// </summary>
public int? ActiveThreshold { get; set; } = 20;

public int? Total { get; set; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
{
using System;
using System.Collections.Generic;
using System.Linq;
using Common;
using Contracts.Enums;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;


class BatchStateEntityConfiguration :
Expand All @@ -22,8 +22,9 @@ public void Configure(EntityTypeBuilder<BatchState> builder)

builder.Property(c => c.CurrentState).IsRequired();

builder.Property(c => c.Action)
.HasConversion(new EnumToStringConverter<BatchAction>());
builder.Property(p => p.Action)
.HasConversion(v => v.Value, i => BatchActionEnum.List().FirstOrDefault(e => e.Value == i));


builder.Property(c => c.UnprocessedOrderIds)
.HasConversion(new JsonValueConverter<Stack<Guid>>())
Expand All @@ -34,4 +35,4 @@ public void Configure(EntityTypeBuilder<BatchState> builder)
.Metadata.SetValueComparer(new JsonValueComparer<Dictionary<Guid, Guid>>());
}
}
}
}
2 changes: 1 addition & 1 deletion src/SampleBatch.Components/StateMachines/JobState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class JobState :

public DateTime? UpdateTimestamp { get; set; }

public BatchAction Action { get; set; }
public BatchActionEnum Action { get; set; }

public string ExceptionMessage { get; set; }

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
namespace SampleBatch.Components.StateMachines
{
using System.Linq;
using Contracts.Enums;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;


class JobStateEntityConfiguration :
Expand All @@ -19,8 +19,8 @@ public void Configure(EntityTypeBuilder<JobState> builder)

builder.Property(c => c.CurrentState).IsRequired();

builder.Property(c => c.Action)
.HasConversion(new EnumToStringConverter<BatchAction>());
builder.Property(p => p.Action)
.HasConversion(v => v.Value, i => BatchActionEnum.List().FirstOrDefault(e => e.Value == i));
}
}
}
}
2 changes: 1 addition & 1 deletion src/SampleBatch.Contracts/BatchJobReceived.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ public interface BatchJobReceived
Guid BatchId { get; }
Guid OrderId { get; }
DateTime Timestamp { get; }
BatchAction Action { get; }
BatchActionEnum Action { get; }
}
}
2 changes: 1 addition & 1 deletion src/SampleBatch.Contracts/BatchReceived.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public interface BatchReceived
{
Guid BatchId { get; }
DateTime Timestamp { get; }
BatchAction Action { get; }
BatchActionEnum Action { get; }
Guid[] OrderIds { get; }
int ActiveThreshold { get; }

Expand Down
8 changes: 0 additions & 8 deletions src/SampleBatch.Contracts/Enums/BatchAction.cs

This file was deleted.

50 changes: 50 additions & 0 deletions src/SampleBatch.Contracts/Enums/BatchActionEnum.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
namespace SampleBatch.Contracts.Enums
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Converter;
using Internal;
using MassTransit;
using MassTransit.Courier;
using MassTransit.Courier.Contracts;
using Newtonsoft.Json;


[JsonConverter(typeof(BatchActionEnumConverter))]
public abstract class BatchActionEnum
{
public static readonly BatchActionEnum CancelOrders = new CancelOrdersEnum();
public static readonly BatchActionEnum SuspendOrders = new SuspendOrdersEnum();

public int Value { get; private set; }
public string Name { get; private set; }

public static IEnumerable<BatchActionEnum> List()
{
yield return CancelOrders;
yield return SuspendOrders;
}

protected BatchActionEnum(int value, string name)
{
Value = value;
Name = name;
}

public async Task<RoutingSlip> SetupRoutingSlip(ConsumeContext<ProcessBatchJob> context, Func<RoutingSlipBuilder, Task> commonAction)
{
var builder = new RoutingSlipBuilder(NewId.NextGuid());

await SetupRoutingSlip(builder, context);

await commonAction?.Invoke(builder);

return builder.Build();

}

protected abstract Task SetupRoutingSlip(RoutingSlipBuilder builder, ConsumeContext<ProcessBatchJob> context);

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
namespace SampleBatch.Contracts.Enums.Converter
{
using System;
using Internal;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Newtonsoft.Json.Serialization;


public class BaseSpecifiedConcreteClassConverter : DefaultContractResolver
{
protected override JsonConverter ResolveContractConverter(Type objectType)
{
if (typeof(BatchActionEnum).IsAssignableFrom(objectType) && !objectType.IsAbstract)
return null; // pretend TableSortRuleConvert is not specified (thus avoiding a stack overflow)
return base.ResolveContractConverter(objectType);
}
}

public class BatchActionEnumConverter : JsonConverter
{
static readonly JsonSerializerSettings SpecifiedSubclassConversion = new JsonSerializerSettings() { ContractResolver = new BaseSpecifiedConcreteClassConverter() };

public override bool CanConvert(Type objectType)
{
return (objectType == typeof(BatchActionEnum));
}

public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
{
JObject jo = JObject.Load(reader);
switch (jo["value"].Value<int>())
{
case 1:
return JsonConvert.DeserializeObject<CancelOrdersEnum>(jo.ToString(), SpecifiedSubclassConversion);
case 2:
return JsonConvert.DeserializeObject<SuspendOrdersEnum>(jo.ToString(), SpecifiedSubclassConversion);
default:
throw new Exception();
}
throw new NotImplementedException();
}

public override bool CanWrite {
get { return false; }
}

public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
{
throw new NotImplementedException(); // won't be called because CanWrite returns false
}
}
}
41 changes: 41 additions & 0 deletions src/SampleBatch.Contracts/Enums/Internal/CancelOrdersEnum.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
namespace SampleBatch.Contracts.Enums.Internal
{
using System;
using System.Threading.Tasks;
using MassTransit;
using MassTransit.Courier;
using MassTransit.Courier.Contracts;


class CancelOrdersEnum : BatchActionEnum
{
public CancelOrdersEnum()
: base(1, "Cancel Orders")
{
}

protected override async Task SetupRoutingSlip( RoutingSlipBuilder builder, ConsumeContext<ProcessBatchJob> context)
{
builder.AddActivity(
"CancelOrder",
new Uri("queue:cancel-order_execute"),
new
{
context.Message.OrderId,
Reason = "Product discontinued"
});

await builder.AddSubscription(
context.SourceAddress,
RoutingSlipEvents.ActivityFaulted,
RoutingSlipEventContents.None,
"CancelOrder",
x => x.Send<BatchJobFailed>(new
{
context.Message.BatchJobId,
context.Message.BatchId,
context.Message.OrderId
}));
}
}
}
Loading

0 comments on commit 2e692b1

Please sign in to comment.