diff --git a/src/AcceptanceTests/NServiceBus.Transport.AzureServiceBus.AcceptanceTests.csproj b/src/AcceptanceTests/NServiceBus.Transport.AzureServiceBus.AcceptanceTests.csproj index 42ec8afd..1148e2d7 100644 --- a/src/AcceptanceTests/NServiceBus.Transport.AzureServiceBus.AcceptanceTests.csproj +++ b/src/AcceptanceTests/NServiceBus.Transport.AzureServiceBus.AcceptanceTests.csproj @@ -19,7 +19,7 @@ - + \ No newline at end of file diff --git a/src/CommandLine/NServiceBus.Transport.AzureServiceBus.CommandLine.csproj b/src/CommandLine/NServiceBus.Transport.AzureServiceBus.CommandLine.csproj index 22560c29..da2b02f7 100644 --- a/src/CommandLine/NServiceBus.Transport.AzureServiceBus.CommandLine.csproj +++ b/src/CommandLine/NServiceBus.Transport.AzureServiceBus.CommandLine.csproj @@ -11,7 +11,7 @@ - + diff --git a/src/CommandLine/Program.cs b/src/CommandLine/Program.cs index 56ead4d8..d8b5fb3f 100644 --- a/src/CommandLine/Program.cs +++ b/src/CommandLine/Program.cs @@ -68,6 +68,14 @@ static int Main(string[] args) createCommand.OnExecuteAsync(async ct => { + // Unfortunately the default value cannot be set outside the execute because it would then + // trigger the validation. There seems to be no way in the command handling library to + // differentiate defaults from user inputs we set a default for the topicName here. + if (!topicName.HasValue() && !topicToPublishTo.HasValue() && !topicToSubscribeOn.HasValue()) + { + topicName.DefaultValue = Topic.DefaultTopicName; + } + await CommandRunner.Run(connectionString, fullyQualifiedNamespace, client => Endpoint.Create(client, name, topicName, topicToPublishTo, topicToSubscribeOn, subscriptionName, size, partitioning)); Console.WriteLine($"Endpoint '{name.Value}' is ready."); @@ -83,6 +91,7 @@ static int Main(string[] args) subscribeCommand.AddOption(connectionString); subscribeCommand.AddOption(fullyQualifiedNamespace); var topicName = subscribeCommand.Option("-t|--topic", "Topic name to subscribe on (defaults to 'bundle-1')", CommandOptionType.SingleValue); + topicName.DefaultValue = Topic.DefaultTopicName; var subscriptionName = subscribeCommand.Option("-b|--subscription", "Subscription name (defaults to endpoint name) ", CommandOptionType.SingleValue); var shortenedRuleName = subscribeCommand.Option("-r|--rule-name", "Rule name (defaults to event type) ", CommandOptionType.SingleValue); @@ -103,6 +112,7 @@ static int Main(string[] args) unsubscribeCommand.AddOption(connectionString); unsubscribeCommand.AddOption(fullyQualifiedNamespace); var topicName = unsubscribeCommand.Option("-t|--topic", "Topic name to unsubscribe from (defaults to 'bundle-1')", CommandOptionType.SingleValue); + topicName.DefaultValue = Topic.DefaultTopicName; var subscriptionName = unsubscribeCommand.Option("-b|--subscription", "Subscription name (defaults to endpoint name) ", CommandOptionType.SingleValue); var shortenedRuleName = unsubscribeCommand.Option("-r|--rule-name", "Rule name (defaults to event type) ", CommandOptionType.SingleValue); diff --git a/src/CommandLine/Topic.cs b/src/CommandLine/Topic.cs index 08dda2e6..71fc2419 100644 --- a/src/CommandLine/Topic.cs +++ b/src/CommandLine/Topic.cs @@ -8,7 +8,7 @@ static class Topic { public static Task Create(ServiceBusAdministrationClient client, CommandOption topicName, CommandOption size, CommandOption partitioning) { - var topicNameToUse = topicName.HasValue() ? topicName.Value() : DefaultTopicName; + var topicNameToUse = topicName.Value(); var options = new CreateTopicOptions(topicNameToUse) { diff --git a/src/CommandLineTests/CommandLineTests.cs b/src/CommandLineTests/CommandLineTests.cs index 7c2069ac..f3960862 100644 --- a/src/CommandLineTests/CommandLineTests.cs +++ b/src/CommandLineTests/CommandLineTests.cs @@ -14,10 +14,28 @@ public class CommandLineTests const string EndpointName = "cli-queue"; const string QueueName = EndpointName; const string TopicName = "cli-topic"; + const string DefaultTopicName = "bundle-1"; const string HierarchyTopicName = "cli-topic-sub"; const string SubscriptionName = QueueName; const string HierarchySubscriptionName = $"forwardTo-{HierarchyTopicName}"; + [Test] + public async Task Create_endpoint_without_specifying_a_topic() + { + await DeleteQueue(QueueName); + await DeleteTopic(DefaultTopicName); + + var (output, error, exitCode) = await Execute($"endpoint create {EndpointName}"); + + Assert.AreEqual(0, exitCode); + Assert.IsTrue(error == string.Empty); + Assert.IsFalse(output.Contains("skipping")); + + await VerifyQueue(QueueName); + await VerifyTopic(DefaultTopicName); + await VerifySubscriptionContainsOnlyDefaultRule(DefaultTopicName, SubscriptionName); + } + [Test] public async Task Create_endpoint_when_there_are_no_entities() { diff --git a/src/Tests/NServiceBus.Transport.AzureServiceBus.Tests.csproj b/src/Tests/NServiceBus.Transport.AzureServiceBus.Tests.csproj index 8d7804d1..4b41584b 100644 --- a/src/Tests/NServiceBus.Transport.AzureServiceBus.Tests.csproj +++ b/src/Tests/NServiceBus.Transport.AzureServiceBus.Tests.csproj @@ -22,7 +22,7 @@ - + \ No newline at end of file diff --git a/src/Transport/NServiceBus.Transport.AzureServiceBus.csproj b/src/Transport/NServiceBus.Transport.AzureServiceBus.csproj index 24a2ae0c..ef8e2a1a 100644 --- a/src/Transport/NServiceBus.Transport.AzureServiceBus.csproj +++ b/src/Transport/NServiceBus.Transport.AzureServiceBus.csproj @@ -8,7 +8,7 @@ - + diff --git a/src/Transport/Receiving/MessagePump.cs b/src/Transport/Receiving/MessagePump.cs index 64f60344..4bf535e0 100644 --- a/src/Transport/Receiving/MessagePump.cs +++ b/src/Transport/Receiving/MessagePump.cs @@ -75,12 +75,7 @@ public Task Initialize( public async Task StartReceive(CancellationToken cancellationToken = default) { - var prefetchCount = limitations.MaxConcurrency * transportSettings.PrefetchMultiplier; - - if (transportSettings.PrefetchCount.HasValue) - { - prefetchCount = transportSettings.PrefetchCount.Value; - } + int prefetchCount = CalculatePrefetchCount(); var receiveOptions = new ServiceBusProcessorOptions { @@ -104,12 +99,37 @@ public async Task StartReceive(CancellationToken cancellationToken = default) messageProcessingCancellationTokenSource = new CancellationTokenSource(); - circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker($"'{receiveSettings.ReceiveAddress}'", transportSettings.TimeToWaitBeforeTriggeringCircuitBreaker, ex => criticalErrorAction("Failed to receive message from Azure Service Bus.", ex, messageProcessingCancellationTokenSource.Token)); + circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker($"'{receiveSettings.ReceiveAddress}'", + transportSettings.TimeToWaitBeforeTriggeringCircuitBreaker, ex => + { + criticalErrorAction("Failed to receive message from Azure Service Bus.", ex, + messageProcessingCancellationTokenSource.Token); + }, () => + { + //We don't have to update the prefetch count since we are failing to receive anyway. + processor.UpdateConcurrency(1); + }, + () => + { + processor.UpdateConcurrency(limitations.MaxConcurrency); + }); await processor.StartProcessingAsync(cancellationToken) .ConfigureAwait(false); } + int CalculatePrefetchCount() + { + var prefetchCount = limitations.MaxConcurrency * transportSettings.PrefetchMultiplier; + + if (transportSettings.PrefetchCount.HasValue) + { + prefetchCount = transportSettings.PrefetchCount.Value; + } + + return prefetchCount; + } + #pragma warning disable PS0018 async Task OnProcessMessage(ProcessMessageEventArgs arg) #pragma warning restore PS0018 @@ -207,26 +227,16 @@ await circuitBreaker.Failure(processErrorEventArgs.Exception, processErrorEventA .ConfigureAwait(false); } - - public async Task ChangeConcurrency(PushRuntimeSettings newLimitations, CancellationToken cancellationToken = default) + public Task ChangeConcurrency(PushRuntimeSettings newLimitations, CancellationToken cancellationToken = default) { limitations = newLimitations; - if (transportSettings.PrefetchCount.HasValue) - { - // For all users that have set a predefined fixed prefetch count we are adjusting the concurrency - // by using what the SDK provides since the prefetch count is always fixed. - processor.UpdateConcurrency(limitations.MaxConcurrency); - } - else - { - // For all other cases the users is using either the default multiplier or a defined multiplier - // that sets the prefetch count in accordance of the maximum concurrency. In those scenarios we cannot - // use UpdateConcurrency because that would not adjust the prefetch count to the new desired values - // therefore we are stopping and restarting (which also creates a new underlying AMQP link that will have - // the new prefetch count settings. - await StopReceive(cancellationToken).ConfigureAwait(false); - await StartReceive(cancellationToken).ConfigureAwait(false); - } + + processor.UpdateConcurrency(limitations.MaxConcurrency); + + int prefetchCount = CalculatePrefetchCount(); + processor.UpdatePrefetchCount(prefetchCount); + + return Task.CompletedTask; } public async Task StopReceive(CancellationToken cancellationToken = default) diff --git a/src/Transport/Receiving/RepeatedFailuresOverTimeCircuitBreaker.cs b/src/Transport/Receiving/RepeatedFailuresOverTimeCircuitBreaker.cs index 0c4bbfda..226cebfc 100644 --- a/src/Transport/Receiving/RepeatedFailuresOverTimeCircuitBreaker.cs +++ b/src/Transport/Receiving/RepeatedFailuresOverTimeCircuitBreaker.cs @@ -7,10 +7,15 @@ class RepeatedFailuresOverTimeCircuitBreaker { - public RepeatedFailuresOverTimeCircuitBreaker(string name, TimeSpan timeToWaitBeforeTriggering, Action triggerAction) + public RepeatedFailuresOverTimeCircuitBreaker(string name, TimeSpan timeToWaitBeforeTriggering, + Action triggerAction, + Action armedAction, + Action disarmedAction) { this.name = name; this.triggerAction = triggerAction; + this.armedAction = armedAction; + this.disarmedAction = disarmedAction; this.timeToWaitBeforeTriggering = timeToWaitBeforeTriggering; timer = new Timer(CircuitBreakerTriggered); @@ -27,6 +32,8 @@ public void Success() timer.Change(Timeout.Infinite, Timeout.Infinite); Logger.InfoFormat("The circuit breaker for {0} is now disarmed", name); + disarmedAction(); + triggered = false; } public Task Failure(Exception exception, CancellationToken cancellationToken = default) @@ -36,11 +43,15 @@ public Task Failure(Exception exception, CancellationToken cancellationToken = d if (newValue == 1) { + armedAction(); timer.Change(timeToWaitBeforeTriggering, NoPeriodicTriggering); Logger.WarnFormat("The circuit breaker for {0} is now in the armed state", name); } - return Task.Delay(TimeSpan.FromSeconds(1), cancellationToken); + //If the circuit breaker has been triggered, wait for 10 seconds before proceeding to prevent flooding the logs and hammering the ServiceBus + var delay = triggered ? TimeSpan.FromSeconds(10) : TimeSpan.FromSeconds(1); + + return Task.Delay(delay, cancellationToken); } public void Dispose() @@ -53,17 +64,21 @@ void CircuitBreakerTriggered(object state) if (Interlocked.Read(ref failureCount) > 0) { Logger.WarnFormat("The circuit breaker for {0} will now be triggered", name); + triggered = true; triggerAction(lastException); } } long failureCount; + volatile bool triggered; Exception lastException; readonly string name; readonly Timer timer; readonly TimeSpan timeToWaitBeforeTriggering; readonly Action triggerAction; + readonly Action armedAction; + readonly Action disarmedAction; static readonly TimeSpan NoPeriodicTriggering = TimeSpan.FromMilliseconds(-1); static readonly ILog Logger = LogManager.GetLogger(); diff --git a/src/TransportTests/NServiceBus.Transport.AzureServiceBus.TransportTests.csproj b/src/TransportTests/NServiceBus.Transport.AzureServiceBus.TransportTests.csproj index 8be8d7eb..54c5045f 100644 --- a/src/TransportTests/NServiceBus.Transport.AzureServiceBus.TransportTests.csproj +++ b/src/TransportTests/NServiceBus.Transport.AzureServiceBus.TransportTests.csproj @@ -9,7 +9,7 @@ - +