diff --git a/src/Sample.Api/Startup.cs b/src/Sample.Api/Startup.cs index 7041db9..b190de2 100644 --- a/src/Sample.Api/Startup.cs +++ b/src/Sample.Api/Startup.cs @@ -60,6 +60,7 @@ public void ConfigureServices(IServiceCollection services) { cfg.Host(Configuration.GetConnectionString("AzureServiceBus")); + cfg.Publish(configurator => configurator.Exclude = true); cfg.Send(s => s.UseSessionIdFormatter(c => c.Message.OrderId.ToString("D"))); }); }); diff --git a/src/Sample.Contracts/OrderShipped.cs b/src/Sample.Contracts/OrderShipped.cs index 281b740..a6a3b7f 100644 --- a/src/Sample.Contracts/OrderShipped.cs +++ b/src/Sample.Contracts/OrderShipped.cs @@ -3,9 +3,13 @@ namespace Sample.Contracts using System; - public record OrderShipped + public record OrderShippedBase { public Guid OrderId { get; init; } public DateTimeOffset Timestamp { get; init; } } + + public record OrderShipped : OrderShippedBase + { + } } \ No newline at end of file diff --git a/src/Sample.Worker/Consumers/OrderShippedConsumer.cs b/src/Sample.Worker/Consumers/OrderShippedConsumer.cs new file mode 100644 index 0000000..8d1049c --- /dev/null +++ b/src/Sample.Worker/Consumers/OrderShippedConsumer.cs @@ -0,0 +1,25 @@ +using System.Threading.Tasks; +using MassTransit; +using Microsoft.Extensions.Logging; +using Sample.Contracts; + +namespace Sample.Worker.Consumers +{ + public class OrderShippedConsumer : + IConsumer + { + private readonly ILogger _logger; + + public OrderShippedConsumer(ILogger logger) + { + _logger = logger; + } + + public Task Consume(ConsumeContext context) + { + _logger.LogInformation("Order Shipped with OrderId: {OrderId}. SessionId {SessionId}.", + context.Message.OrderId, context.SessionId()); + return Task.CompletedTask; + } + } +} \ No newline at end of file diff --git a/src/Sample.Worker/Program.cs b/src/Sample.Worker/Program.cs index 23bd044..3f91f27 100644 --- a/src/Sample.Worker/Program.cs +++ b/src/Sample.Worker/Program.cs @@ -1,3 +1,5 @@ +using System; + namespace Sample.Worker { using System.Threading.Tasks; @@ -29,6 +31,10 @@ public static IHostBuilder CreateHostBuilder(string[] args) x.AddConsumer(); x.AddConsumer(); + x.AddConsumer(e => + { + e.UseTimeout(c => c.Timeout = TimeSpan.FromSeconds(10)); + }); x.AddSagaStateMachine() .MessageSessionRepository(); @@ -48,6 +54,19 @@ public static IHostBuilder CreateHostBuilder(string[] args) e.ConfigureConsumer(context); }); + cfg.Publish(configurator => configurator.Exclude = true); + cfg.SubscriptionEndpoint("order-shipped-consumer", e => + { + e.ConfigureConsumeTopology = false; + e.AutoStart = true; + e.AutoDeleteOnIdle = TimeSpan.FromDays(30); + e.MaxDeliveryCount = 2; + e.RequiresSession = true; + e.MaxConcurrentCalls = 8; //like AZ Function default. 8 Concurrent consumers handling 1 ServiceBus Session + e.PrefetchCount = 0; + e.ConfigureConsumer(context); + }); + cfg.ConfigureEndpoints(context); }); });