Skip to content

Commit

Permalink
Backport 842 to release 3.2 (#843)
Browse files Browse the repository at this point in the history
Co-authored-by: lailabougria <[email protected]>
  • Loading branch information
mikeminutillo and lailabougria authored Jun 30, 2023
1 parent b85b2b9 commit 14fd693
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 12 deletions.
18 changes: 6 additions & 12 deletions src/Tests/Sending/MessageDispatcherTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ public async Task Should_use_connection_information_of_existing_service_bus_tran
}

[Test]
public async Task Should_split_into_batches_of_max_hundred_when_transactions_used()
public void Should_throw_when_detecting_more_than_hundred_messages_when_transactions_used()
{
var defaultClient = new FakeServiceBusClient();
var defaultSender = new FakeSender();
Expand All @@ -415,8 +415,9 @@ public async Task Should_split_into_batches_of_max_hundred_when_transactions_use

var dispatcher = new MessageDispatcher(new MessageSenderRegistry(defaultClient), "sometopic");

var operations = new List<TransportOperation>(150);
for (int i = 0; i < 150; i++)
var nrOfMessages = 150;
var operations = new List<TransportOperation>(nrOfMessages);
for (int i = 0; i < nrOfMessages; i++)
{
operations.Add(new TransportOperation(new OutgoingMessage($"SomeId{i}",
new Dictionary<string, string> { { "Number", i.ToString() } },
Expand All @@ -429,15 +430,8 @@ public async Task Should_split_into_batches_of_max_hundred_when_transactions_use
var azureServiceBusTransaction = new AzureServiceBusTransportTransaction(transactionalClient,
"SomePartitionKey", new TransactionOptions());

await dispatcher.Dispatch(new TransportOperations(operations.ToArray()), azureServiceBusTransaction.TransportTransaction);

Assert.That(transactionalSender.BatchSentMessages, Has.Count.EqualTo(3));
var firstBatch = transactionalSender[transactionalSender.BatchSentMessages.ElementAt(0)];
var secondBatch = transactionalSender[transactionalSender.BatchSentMessages.ElementAt(1)];
var thirdBatch = transactionalSender[transactionalSender.BatchSentMessages.ElementAt(2)];
Assert.That(firstBatch, Has.Count.EqualTo(100));
Assert.That(secondBatch, Has.Count.EqualTo(25));
Assert.That(thirdBatch, Has.Count.EqualTo(25));
var ex = Assert.ThrowsAsync<Exception>(async () => await dispatcher.Dispatch(new TransportOperations(operations.ToArray()), azureServiceBusTransaction.TransportTransaction));
Assert.AreEqual($"The number of outgoing messages ({nrOfMessages}) exceeds the limits permitted by Azure Service Bus ({100}) in a single transaction", ex.Message);
}

[Test]
Expand Down
2 changes: 2 additions & 0 deletions src/Transport/AzureServiceBusTransportTransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ internal Transaction? Transaction
}
}

internal bool HasTransaction => transaction != null || transactionOptions.HasValue;

internal ServiceBusClient? ServiceBusClient
{
get;
Expand Down
9 changes: 9 additions & 0 deletions src/Transport/Sending/MessageDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ namespace NServiceBus.Transport.AzureServiceBus

class MessageDispatcher : IMessageDispatcher
{
const int MaxMessageThresholdForTransaction = 100;

static readonly ILog Log = LogManager.GetLogger<MessageDispatcher>();
static readonly Dictionary<string, List<IOutgoingTransportOperation>> emptyDestinationAndOperations = new();

Expand Down Expand Up @@ -40,6 +42,7 @@ public async Task Dispatch(TransportOperations outgoingMessages, TransportTransa

Dictionary<string, List<IOutgoingTransportOperation>>? isolatedOperationsPerDestination = null;
Dictionary<string, List<IOutgoingTransportOperation>>? defaultOperationsPerDestination = null;
var numberOfDefaultOperations = 0;
var numberOfIsolatedOperations = 0;
var numberOfDefaultOperationDestinations = 0;

Expand All @@ -49,6 +52,7 @@ public async Task Dispatch(TransportOperations outgoingMessages, TransportTransa
switch (operation.RequiredDispatchConsistency)
{
case DispatchConsistency.Default:
numberOfDefaultOperations++;
defaultOperationsPerDestination ??=
new Dictionary<string, List<IOutgoingTransportOperation>>(StringComparer.OrdinalIgnoreCase);

Expand Down Expand Up @@ -82,6 +86,11 @@ public async Task Dispatch(TransportOperations outgoingMessages, TransportTransa
}
}

if (azureServiceBusTransaction is { HasTransaction: true } && numberOfDefaultOperations > MaxMessageThresholdForTransaction)
{
throw new Exception($"The number of outgoing messages ({numberOfDefaultOperations}) exceeds the limits permitted by Azure Service Bus ({MaxMessageThresholdForTransaction}) in a single transaction");
}

var concurrentDispatchTasks =
new List<Task>(numberOfIsolatedOperations + numberOfDefaultOperationDestinations);
AddIsolatedOperationsTo(concurrentDispatchTasks, isolatedOperationsPerDestination ?? emptyDestinationAndOperations, transaction, azureServiceBusTransaction, cancellationToken);
Expand Down

0 comments on commit 14fd693

Please sign in to comment.