Skip to content

Commit

Permalink
Merge pull request #1019 from Particular/try-add-limit
Browse files Browse the repository at this point in the history
Fallback to individual sends when batching is not possible
  • Loading branch information
PhilBastian authored Aug 7, 2024
2 parents 89f96d6 + 5d81c7c commit 9e9a8a1
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 58 deletions.
103 changes: 73 additions & 30 deletions src/Tests/Sending/MessageDispatcherTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -336,28 +336,6 @@ public async Task Should_allow_mixing_operations()
Assert.That(someTopicSenderBatchContent, Has.Count.EqualTo(1));
}

[Test]
public void Should_throw_when_batch_size_exceeded()
{
var client = new FakeServiceBusClient();
var sender = new FakeSender();
client.Senders["SomeDestination"] = sender;

sender.TryAdd = _ => false;

var dispatcher = new MessageDispatcher(new MessageSenderRegistry(client), "sometopic");

var operation1 =
new TransportOperation(new OutgoingMessage("SomeId",
[],
ReadOnlyMemory<byte>.Empty),
new UnicastAddressTag("SomeDestination"),
[],
DispatchConsistency.Default);

Assert.ThrowsAsync<ServiceBusException>(async () => await dispatcher.Dispatch(new TransportOperations(operation1), new TransportTransaction()));
}

[Test]
public async Task Should_use_connection_information_of_existing_service_bus_transaction()
{
Expand Down Expand Up @@ -435,7 +413,7 @@ public void Should_throw_when_detecting_more_than_hundred_messages_when_transact
}

[Test]
public async Task Should_split_into_batches_of_max_4500_when_no_transactions_used()
public async Task Should_split_into_multiple_batches_according_to_the_sdk()
{
var defaultClient = new FakeServiceBusClient();
var defaultSender = new FakeSender();
Expand All @@ -444,7 +422,7 @@ public async Task Should_split_into_batches_of_max_4500_when_no_transactions_use
bool firstTime = true;
defaultSender.TryAdd = msg =>
{
if ((string)msg.ApplicationProperties["Number"] != "4550" || !firstTime)
if ((string)msg.ApplicationProperties["Number"] != "150" || !firstTime)
{
return true;
}
Expand All @@ -455,8 +433,8 @@ public async Task Should_split_into_batches_of_max_4500_when_no_transactions_use

var dispatcher = new MessageDispatcher(new MessageSenderRegistry(defaultClient), "sometopic");

var operations = new List<TransportOperation>(4600);
for (int i = 0; i < 4600; i++)
var operations = new List<TransportOperation>(200);
for (int i = 0; i < 200; i++)
{
operations.Add(new TransportOperation(new OutgoingMessage($"SomeId{i}",
new Dictionary<string, string> { { "Number", i.ToString() } },
Expand All @@ -470,13 +448,78 @@ public async Task Should_split_into_batches_of_max_4500_when_no_transactions_use

await dispatcher.Dispatch(new TransportOperations(operations.ToArray()), azureServiceBusTransaction.TransportTransaction);

Assert.That(defaultSender.BatchSentMessages, Has.Count.EqualTo(3));
Assert.That(defaultSender.BatchSentMessages, Has.Count.EqualTo(2));
var firstBatch = defaultSender[defaultSender.BatchSentMessages.ElementAt(0)];
var secondBatch = defaultSender[defaultSender.BatchSentMessages.ElementAt(1)];
var thirdBatch = defaultSender[defaultSender.BatchSentMessages.ElementAt(2)];
Assert.That(firstBatch, Has.Count.EqualTo(4500));
Assert.That(firstBatch, Has.Count.EqualTo(150));
Assert.That(secondBatch, Has.Count.EqualTo(50));
Assert.That(thirdBatch, Has.Count.EqualTo(50));
}

[Test]
public async Task Should_fallback_to_individual_sends_when_messages_cannot_be_added_to_batch()
{
var defaultClient = new FakeServiceBusClient();
var defaultSender = new FakeSender();
defaultClient.Senders["SomeDestination"] = defaultSender;

defaultSender.TryAdd = msg => false;

var dispatcher = new MessageDispatcher(new MessageSenderRegistry(defaultClient), "sometopic");

var operations = new List<TransportOperation>(5);
for (int i = 0; i < 5; i++)
{
operations.Add(new TransportOperation(new OutgoingMessage($"SomeId{i}",
new Dictionary<string, string> { { "Number", i.ToString() } },
ReadOnlyMemory<byte>.Empty),
new UnicastAddressTag("SomeDestination"),
[],
DispatchConsistency.Default));
}

var azureServiceBusTransaction = new AzureServiceBusTransportTransaction();

await dispatcher.Dispatch(new TransportOperations(operations.ToArray()), azureServiceBusTransaction.TransportTransaction);

Assert.That(defaultSender.BatchSentMessages, Has.Count.Zero);
Assert.That(defaultSender.IndividuallySentMessages, Has.Count.EqualTo(5));
}

[Test]
public async Task Should_fallback_to_individual_send_when_a_message_cannot_be_added_to_a_batch_but_batch_all_others()
{
var defaultClient = new FakeServiceBusClient();
var defaultSender = new FakeSender();
defaultClient.Senders["SomeDestination"] = defaultSender;

defaultSender.TryAdd = msg =>
{
return (string)msg.ApplicationProperties["Number"] switch
{
"4" or "7" => false,
_ => true,
};
};

var dispatcher = new MessageDispatcher(new MessageSenderRegistry(defaultClient), "sometopic");

var operations = new List<TransportOperation>(5);
for (int i = 0; i < 10; i++)
{
operations.Add(new TransportOperation(new OutgoingMessage($"SomeId{i}",
new Dictionary<string, string> { { "Number", i.ToString() } },
ReadOnlyMemory<byte>.Empty),
new UnicastAddressTag("SomeDestination"),
[],
DispatchConsistency.Default));
}

var azureServiceBusTransaction = new AzureServiceBusTransportTransaction();

await dispatcher.Dispatch(new TransportOperations(operations.ToArray()), azureServiceBusTransaction.TransportTransaction);

Assert.That(defaultSender.BatchSentMessages, Has.Count.EqualTo(3));
Assert.That(defaultSender.IndividuallySentMessages, Has.Count.EqualTo(2));
}

[Test]
Expand Down
61 changes: 33 additions & 28 deletions src/Transport/Sending/MessageDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ void AddBatchedOperationsTo(List<Task> dispatchTasks,
var operations = destinationAndOperations.Value;

var messagesToSend = new Queue<ServiceBusMessage>(operations.Count);
// We assume the majority of the messages will be batched and only a few will be sent individually
// and therefore it is OK in those rare cases for the list to grow.
var messagesTooLargeToBeBatched = new List<ServiceBusMessage>(0);
foreach (var operation in operations)
{
var message = operation.Message.ToAzureServiceBusMessage(operation.Properties, azureServiceBusTransportTransaction?.IncomingQueuePartitionKey);
Expand All @@ -128,25 +131,21 @@ void AddBatchedOperationsTo(List<Task> dispatchTasks,
}
// Accessing azureServiceBusTransaction.CommittableTransaction will initialize it if it isn't yet
// doing the access as late as possible but still on the synchronous path.
dispatchTasks.Add(DispatchBatchForDestination(destination, azureServiceBusTransportTransaction?.ServiceBusClient, azureServiceBusTransportTransaction?.Transaction, messagesToSend, cancellationToken));
dispatchTasks.Add(DispatchBatchForDestination(destination, azureServiceBusTransportTransaction?.ServiceBusClient, azureServiceBusTransportTransaction?.Transaction, messagesToSend, messagesTooLargeToBeBatched, cancellationToken));

foreach (var message in messagesTooLargeToBeBatched)
{
dispatchTasks.Add(DispatchForDestination(destination, azureServiceBusTransportTransaction?.ServiceBusClient, azureServiceBusTransportTransaction?.Transaction, message, cancellationToken));
}
}
}

async Task DispatchBatchForDestination(string destination, ServiceBusClient? client, Transaction? transaction, Queue<ServiceBusMessage> messagesToSend, CancellationToken cancellationToken)
async Task DispatchBatchForDestination(string destination, ServiceBusClient? client, Transaction? transaction,
Queue<ServiceBusMessage> messagesToSend, List<ServiceBusMessage> messagesTooLargeToBeBatched,
CancellationToken cancellationToken)
{
var messageCount = messagesToSend.Count;
int batchCount = 0;
var sender = messageSenderRegistry.GetMessageSender(destination, client);
// There are two limits for batching that unfortunately are not enforced over TryAdd.
//
// Limit 1: For transactional sends you cannot add more than 100 messages into the same batch. This limit
// is enforced when the batch is attempted to be sent.
//
// Limit 2: For non-transactional sends you cannot add more than 4500 messages into the same batch. This limit
// is enforced when the batch is attempted to be sent. There are plans to incorporate this limit into
// the TryAdd logic, see https://github.com/Azure/azure-sdk-for-net/issues/21451. Even though with all
// the headers we will probably never reach 4500 messages per batch this upper limit was added as a precaution
int maxItemsPerBatch = transaction == null ? 4500 : 100;
while (messagesToSend.Count > 0)
{
using ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync(cancellationToken)
Expand All @@ -158,28 +157,34 @@ async Task DispatchBatchForDestination(string destination, ServiceBusClient? cli
logBuilder = new StringBuilder();
}

var peekedMessage = messagesToSend.Peek();
if (messageBatch.TryAddMessage(peekedMessage))
var dequeueMessage = messagesToSend.Dequeue();
// In this case the batch is fresh and doesn't have any messages yet. If TryAdd returns false
// we know the message can never be added to any batch and therefore we collect it to be sent
// individually.
if (messageBatch.TryAddMessage(dequeueMessage))
{
var added = messagesToSend.Dequeue();
if (Log.IsDebugEnabled)
{
added.ApplicationProperties.TryGetValue(Headers.MessageId, out var messageId);
logBuilder!.Append($"{messageId ?? added.MessageId},");
dequeueMessage.ApplicationProperties.TryGetValue(Headers.MessageId, out var messageId);
logBuilder!.Append($"{messageId ?? dequeueMessage.MessageId},");
}
}
else
{
peekedMessage.ApplicationProperties.TryGetValue(Headers.MessageId, out var messageId);
var message =
@$"Unable to add the message '#{messageCount - messagesToSend.Count}' with message id '{messageId ?? peekedMessage.MessageId}' to the the batch '#{batchCount}'.
The message may be too large, or the batch size has reached the maximum allowed messages per batch for the current tier selected for the namespace '{sender.FullyQualifiedNamespace}'.
To mitigate this problem, either reduce the message size by using the data bus, upgrade to a higher Service Bus tier, or increase the maximum message size.
If the maximum message size is increased, the endpoint must be restarted for the change to take effect.";
throw new ServiceBusException(message, ServiceBusFailureReason.MessageSizeExceeded);
if (Log.IsDebugEnabled)
{
dequeueMessage.ApplicationProperties.TryGetValue(Headers.MessageId, out var messageId);
Log.Debug($"Message '{messageId ?? dequeueMessage.MessageId}' is too large for the batch '{batchCount}' and will be sent individually to destination {destination}.");
}
messagesTooLargeToBeBatched.Add(dequeueMessage);
continue;
}

while (messagesToSend.Count > 0 && messageBatch.Count < maxItemsPerBatch && messageBatch.TryAddMessage(messagesToSend.Peek()))
// Trying to add as many messages as we can to the batch. TryAdd might return false due to the batch being full
// or the message being too large. In the case when the message is too large for the batch the next iteration
// will try to add it to a fresh batch and if that fails too we will add it to the list of messages that couldn't be sent
// trying to attempt to send them individually.
while (messagesToSend.Count > 0 && messageBatch.TryAddMessage(messagesToSend.Peek()))
{
var added = messagesToSend.Dequeue();
if (Log.IsDebugEnabled)
Expand Down Expand Up @@ -228,12 +233,12 @@ void AddIsolatedOperationsTo(List<Task> dispatchTasks,
{
var message = operation.Message.ToAzureServiceBusMessage(operation.Properties, azureServiceBusTransportTransaction?.IncomingQueuePartitionKey);
operation.ApplyCustomizationToOutgoingNativeMessage(message, transportTransaction, Log);
dispatchTasks.Add(DispatchIsolatedForDestination(destination, azureServiceBusTransportTransaction?.ServiceBusClient, noTransaction, message, cancellationToken));
dispatchTasks.Add(DispatchForDestination(destination, azureServiceBusTransportTransaction?.ServiceBusClient, noTransaction, message, cancellationToken));
}
}
}

async Task DispatchIsolatedForDestination(string destination, ServiceBusClient? client, Transaction? transaction, ServiceBusMessage message, CancellationToken cancellationToken)
async Task DispatchForDestination(string destination, ServiceBusClient? client, Transaction? transaction, ServiceBusMessage message, CancellationToken cancellationToken)
{
var sender = messageSenderRegistry.GetMessageSender(destination, client);
// Making sure we have a suppress scope around the sending
Expand Down

0 comments on commit 9e9a8a1

Please sign in to comment.