Skip to content

Commit

Permalink
Merge pull request #762 from Particular/port-improvements
Browse files Browse the repository at this point in the history
Ports recent improvements to 3.1.3
  • Loading branch information
SzymonPobiega authored Jan 27, 2023
2 parents 2c6830b + 3889b5b commit 95883f5
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
</ItemGroup>

<ItemGroup Label="Force the latest version of the transitive dependencies">
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.11.1" />
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.12.0" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

<ItemGroup>
<PackageReference Include="McMaster.Extensions.CommandLineUtils" Version="4.0.1" />
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.11.1" />
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.12.0" />
<PackageReference Include="Azure.Identity" Version="1.8.0" />
<PackageReference Include="Particular.Packaging" Version="2.2.0" PrivateAssets="All" />
</ItemGroup>
Expand Down
10 changes: 10 additions & 0 deletions src/CommandLine/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ static int Main(string[] args)

createCommand.OnExecuteAsync(async ct =>
{
// Unfortunately the default value cannot be set outside the execute because it would then
// trigger the validation. There seems to be no way in the command handling library to
// differentiate defaults from user inputs we set a default for the topicName here.
if (!topicName.HasValue() && !topicToPublishTo.HasValue() && !topicToSubscribeOn.HasValue())
{
topicName.DefaultValue = Topic.DefaultTopicName;
}

await CommandRunner.Run(connectionString, fullyQualifiedNamespace, client => Endpoint.Create(client, name, topicName, topicToPublishTo, topicToSubscribeOn, subscriptionName, size, partitioning));

Console.WriteLine($"Endpoint '{name.Value}' is ready.");
Expand All @@ -83,6 +91,7 @@ static int Main(string[] args)
subscribeCommand.AddOption(connectionString);
subscribeCommand.AddOption(fullyQualifiedNamespace);
var topicName = subscribeCommand.Option("-t|--topic", "Topic name to subscribe on (defaults to 'bundle-1')", CommandOptionType.SingleValue);
topicName.DefaultValue = Topic.DefaultTopicName;
var subscriptionName = subscribeCommand.Option("-b|--subscription", "Subscription name (defaults to endpoint name) ", CommandOptionType.SingleValue);
var shortenedRuleName = subscribeCommand.Option("-r|--rule-name", "Rule name (defaults to event type) ", CommandOptionType.SingleValue);

Expand All @@ -103,6 +112,7 @@ static int Main(string[] args)
unsubscribeCommand.AddOption(connectionString);
unsubscribeCommand.AddOption(fullyQualifiedNamespace);
var topicName = unsubscribeCommand.Option("-t|--topic", "Topic name to unsubscribe from (defaults to 'bundle-1')", CommandOptionType.SingleValue);
topicName.DefaultValue = Topic.DefaultTopicName;
var subscriptionName = unsubscribeCommand.Option("-b|--subscription", "Subscription name (defaults to endpoint name) ", CommandOptionType.SingleValue);
var shortenedRuleName = unsubscribeCommand.Option("-r|--rule-name", "Rule name (defaults to event type) ", CommandOptionType.SingleValue);

Expand Down
2 changes: 1 addition & 1 deletion src/CommandLine/Topic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ static class Topic
{
public static Task Create(ServiceBusAdministrationClient client, CommandOption topicName, CommandOption<int> size, CommandOption partitioning)
{
var topicNameToUse = topicName.HasValue() ? topicName.Value() : DefaultTopicName;
var topicNameToUse = topicName.Value();

var options = new CreateTopicOptions(topicNameToUse)
{
Expand Down
18 changes: 18 additions & 0 deletions src/CommandLineTests/CommandLineTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,28 @@ public class CommandLineTests
const string EndpointName = "cli-queue";
const string QueueName = EndpointName;
const string TopicName = "cli-topic";
const string DefaultTopicName = "bundle-1";
const string HierarchyTopicName = "cli-topic-sub";
const string SubscriptionName = QueueName;
const string HierarchySubscriptionName = $"forwardTo-{HierarchyTopicName}";

[Test]
public async Task Create_endpoint_without_specifying_a_topic()
{
await DeleteQueue(QueueName);
await DeleteTopic(DefaultTopicName);

var (output, error, exitCode) = await Execute($"endpoint create {EndpointName}");

Assert.AreEqual(0, exitCode);
Assert.IsTrue(error == string.Empty);
Assert.IsFalse(output.Contains("skipping"));

await VerifyQueue(QueueName);
await VerifyTopic(DefaultTopicName);
await VerifySubscriptionContainsOnlyDefaultRule(DefaultTopicName, SubscriptionName);
}

[Test]
public async Task Create_endpoint_when_there_are_no_entities()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
</ItemGroup>

<ItemGroup Label="Force the latest version of the transitive dependencies">
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.11.1" />
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.12.0" />
</ItemGroup>

</Project>
2 changes: 1 addition & 1 deletion src/Transport/NServiceBus.Transport.AzureServiceBus.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Azure.Messaging.ServiceBus" Version="[7.11.1, 8.0.0)" />
<PackageReference Include="Azure.Messaging.ServiceBus" Version="[7.12.0, 8.0.0)" />
<PackageReference Include="NServiceBus" Version="[8.0.0, 9.0.0)" />
<PackageReference Include="Particular.Packaging" Version="2.2.0" PrivateAssets="All" />
</ItemGroup>
Expand Down
60 changes: 35 additions & 25 deletions src/Transport/Receiving/MessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,7 @@ public Task Initialize(

public async Task StartReceive(CancellationToken cancellationToken = default)
{
var prefetchCount = limitations.MaxConcurrency * transportSettings.PrefetchMultiplier;

if (transportSettings.PrefetchCount.HasValue)
{
prefetchCount = transportSettings.PrefetchCount.Value;
}
int prefetchCount = CalculatePrefetchCount();

var receiveOptions = new ServiceBusProcessorOptions
{
Expand All @@ -104,12 +99,37 @@ public async Task StartReceive(CancellationToken cancellationToken = default)

messageProcessingCancellationTokenSource = new CancellationTokenSource();

circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker($"'{receiveSettings.ReceiveAddress}'", transportSettings.TimeToWaitBeforeTriggeringCircuitBreaker, ex => criticalErrorAction("Failed to receive message from Azure Service Bus.", ex, messageProcessingCancellationTokenSource.Token));
circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker($"'{receiveSettings.ReceiveAddress}'",
transportSettings.TimeToWaitBeforeTriggeringCircuitBreaker, ex =>
{
criticalErrorAction("Failed to receive message from Azure Service Bus.", ex,
messageProcessingCancellationTokenSource.Token);
}, () =>
{
//We don't have to update the prefetch count since we are failing to receive anyway.
processor.UpdateConcurrency(1);
},
() =>
{
processor.UpdateConcurrency(limitations.MaxConcurrency);
});

await processor.StartProcessingAsync(cancellationToken)
.ConfigureAwait(false);
}

int CalculatePrefetchCount()
{
var prefetchCount = limitations.MaxConcurrency * transportSettings.PrefetchMultiplier;

if (transportSettings.PrefetchCount.HasValue)
{
prefetchCount = transportSettings.PrefetchCount.Value;
}

return prefetchCount;
}

#pragma warning disable PS0018
async Task OnProcessMessage(ProcessMessageEventArgs arg)
#pragma warning restore PS0018
Expand Down Expand Up @@ -207,26 +227,16 @@ await circuitBreaker.Failure(processErrorEventArgs.Exception, processErrorEventA
.ConfigureAwait(false);
}


public async Task ChangeConcurrency(PushRuntimeSettings newLimitations, CancellationToken cancellationToken = default)
public Task ChangeConcurrency(PushRuntimeSettings newLimitations, CancellationToken cancellationToken = default)
{
limitations = newLimitations;
if (transportSettings.PrefetchCount.HasValue)
{
// For all users that have set a predefined fixed prefetch count we are adjusting the concurrency
// by using what the SDK provides since the prefetch count is always fixed.
processor.UpdateConcurrency(limitations.MaxConcurrency);
}
else
{
// For all other cases the users is using either the default multiplier or a defined multiplier
// that sets the prefetch count in accordance of the maximum concurrency. In those scenarios we cannot
// use UpdateConcurrency because that would not adjust the prefetch count to the new desired values
// therefore we are stopping and restarting (which also creates a new underlying AMQP link that will have
// the new prefetch count settings.
await StopReceive(cancellationToken).ConfigureAwait(false);
await StartReceive(cancellationToken).ConfigureAwait(false);
}

processor.UpdateConcurrency(limitations.MaxConcurrency);

int prefetchCount = CalculatePrefetchCount();
processor.UpdatePrefetchCount(prefetchCount);

return Task.CompletedTask;
}

public async Task StopReceive(CancellationToken cancellationToken = default)
Expand Down
19 changes: 17 additions & 2 deletions src/Transport/Receiving/RepeatedFailuresOverTimeCircuitBreaker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@

class RepeatedFailuresOverTimeCircuitBreaker
{
public RepeatedFailuresOverTimeCircuitBreaker(string name, TimeSpan timeToWaitBeforeTriggering, Action<Exception> triggerAction)
public RepeatedFailuresOverTimeCircuitBreaker(string name, TimeSpan timeToWaitBeforeTriggering,
Action<Exception> triggerAction,
Action armedAction,
Action disarmedAction)
{
this.name = name;
this.triggerAction = triggerAction;
this.armedAction = armedAction;
this.disarmedAction = disarmedAction;
this.timeToWaitBeforeTriggering = timeToWaitBeforeTriggering;

timer = new Timer(CircuitBreakerTriggered);
Expand All @@ -27,6 +32,8 @@ public void Success()

timer.Change(Timeout.Infinite, Timeout.Infinite);
Logger.InfoFormat("The circuit breaker for {0} is now disarmed", name);
disarmedAction();
triggered = false;
}

public Task Failure(Exception exception, CancellationToken cancellationToken = default)
Expand All @@ -36,11 +43,15 @@ public Task Failure(Exception exception, CancellationToken cancellationToken = d

if (newValue == 1)
{
armedAction();
timer.Change(timeToWaitBeforeTriggering, NoPeriodicTriggering);
Logger.WarnFormat("The circuit breaker for {0} is now in the armed state", name);
}

return Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
//If the circuit breaker has been triggered, wait for 10 seconds before proceeding to prevent flooding the logs and hammering the ServiceBus
var delay = triggered ? TimeSpan.FromSeconds(10) : TimeSpan.FromSeconds(1);

return Task.Delay(delay, cancellationToken);
}

public void Dispose()
Expand All @@ -53,17 +64,21 @@ void CircuitBreakerTriggered(object state)
if (Interlocked.Read(ref failureCount) > 0)
{
Logger.WarnFormat("The circuit breaker for {0} will now be triggered", name);
triggered = true;
triggerAction(lastException);
}
}

long failureCount;
volatile bool triggered;
Exception lastException;

readonly string name;
readonly Timer timer;
readonly TimeSpan timeToWaitBeforeTriggering;
readonly Action<Exception> triggerAction;
readonly Action armedAction;
readonly Action disarmedAction;

static readonly TimeSpan NoPeriodicTriggering = TimeSpan.FromMilliseconds(-1);
static readonly ILog Logger = LogManager.GetLogger<RepeatedFailuresOverTimeCircuitBreaker>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.11.1" />
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.12.0" />
<PackageReference Include="GitHubActionsTestLogger" Version="2.0.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.4.0" />
<PackageReference Include="NServiceBus.TransportTests.Sources" Version="8.0.0" />
Expand Down

0 comments on commit 95883f5

Please sign in to comment.