From 3ae77a1ff862d5a502152710a15213af7583b879 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Tue, 6 Aug 2024 12:13:27 +0200 Subject: [PATCH 1/6] Add a few blunt tests to verify the new behavior --- src/Tests/Sending/MessageDispatcherTests.cs | 66 +++++++++++---------- 1 file changed, 36 insertions(+), 30 deletions(-) diff --git a/src/Tests/Sending/MessageDispatcherTests.cs b/src/Tests/Sending/MessageDispatcherTests.cs index 039b8bf4..0c4f3d81 100644 --- a/src/Tests/Sending/MessageDispatcherTests.cs +++ b/src/Tests/Sending/MessageDispatcherTests.cs @@ -336,28 +336,6 @@ public async Task Should_allow_mixing_operations() Assert.That(someTopicSenderBatchContent, Has.Count.EqualTo(1)); } - [Test] - public void Should_throw_when_batch_size_exceeded() - { - var client = new FakeServiceBusClient(); - var sender = new FakeSender(); - client.Senders["SomeDestination"] = sender; - - sender.TryAdd = _ => false; - - var dispatcher = new MessageDispatcher(new MessageSenderRegistry(client), "sometopic"); - - var operation1 = - new TransportOperation(new OutgoingMessage("SomeId", - [], - ReadOnlyMemory.Empty), - new UnicastAddressTag("SomeDestination"), - [], - DispatchConsistency.Default); - - Assert.ThrowsAsync(async () => await dispatcher.Dispatch(new TransportOperations(operation1), new TransportTransaction())); - } - [Test] public async Task Should_use_connection_information_of_existing_service_bus_transaction() { @@ -435,7 +413,7 @@ public void Should_throw_when_detecting_more_than_hundred_messages_when_transact } [Test] - public async Task Should_split_into_batches_of_max_4500_when_no_transactions_used() + public async Task Should_split_into_multiple_batches_according_to_the_sdk() { var defaultClient = new FakeServiceBusClient(); var defaultSender = new FakeSender(); @@ -444,7 +422,7 @@ public async Task Should_split_into_batches_of_max_4500_when_no_transactions_use bool firstTime = true; defaultSender.TryAdd = msg => { - if ((string)msg.ApplicationProperties["Number"] != "4550" || !firstTime) + if ((string)msg.ApplicationProperties["Number"] != "150" || !firstTime) { return true; } @@ -455,8 +433,8 @@ public async Task Should_split_into_batches_of_max_4500_when_no_transactions_use var dispatcher = new MessageDispatcher(new MessageSenderRegistry(defaultClient), "sometopic"); - var operations = new List(4600); - for (int i = 0; i < 4600; i++) + var operations = new List(200); + for (int i = 0; i < 200; i++) { operations.Add(new TransportOperation(new OutgoingMessage($"SomeId{i}", new Dictionary { { "Number", i.ToString() } }, @@ -470,13 +448,41 @@ public async Task Should_split_into_batches_of_max_4500_when_no_transactions_use await dispatcher.Dispatch(new TransportOperations(operations.ToArray()), azureServiceBusTransaction.TransportTransaction); - Assert.That(defaultSender.BatchSentMessages, Has.Count.EqualTo(3)); + Assert.That(defaultSender.BatchSentMessages, Has.Count.EqualTo(2)); var firstBatch = defaultSender[defaultSender.BatchSentMessages.ElementAt(0)]; var secondBatch = defaultSender[defaultSender.BatchSentMessages.ElementAt(1)]; - var thirdBatch = defaultSender[defaultSender.BatchSentMessages.ElementAt(2)]; - Assert.That(firstBatch, Has.Count.EqualTo(4500)); + Assert.That(firstBatch, Has.Count.EqualTo(150)); Assert.That(secondBatch, Has.Count.EqualTo(50)); - Assert.That(thirdBatch, Has.Count.EqualTo(50)); + } + + [Test] + public async Task Should_fallback_to_individual_sends_when_messages_cannot_be_added_to_batch() + { + var defaultClient = new FakeServiceBusClient(); + var defaultSender = new FakeSender(); + defaultClient.Senders["SomeDestination"] = defaultSender; + + defaultSender.TryAdd = msg => false; + + var dispatcher = new MessageDispatcher(new MessageSenderRegistry(defaultClient), "sometopic"); + + var operations = new List(5); + for (int i = 0; i < 5; i++) + { + operations.Add(new TransportOperation(new OutgoingMessage($"SomeId{i}", + new Dictionary { { "Number", i.ToString() } }, + ReadOnlyMemory.Empty), + new UnicastAddressTag("SomeDestination"), + [], + DispatchConsistency.Default)); + } + + var azureServiceBusTransaction = new AzureServiceBusTransportTransaction(); + + await dispatcher.Dispatch(new TransportOperations(operations.ToArray()), azureServiceBusTransaction.TransportTransaction); + + Assert.That(defaultSender.BatchSentMessages, Has.Count.Zero); + Assert.That(defaultSender.IndividuallySentMessages, Has.Count.EqualTo(5)); } [Test] From 6ad77a49d0026c64d20c70c8e8f14b05295b33d5 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Tue, 6 Aug 2024 12:14:28 +0200 Subject: [PATCH 2/6] Fallback to individual sends when batching is not possible --- src/Transport/Sending/MessageDispatcher.cs | 52 ++++++++++------------ 1 file changed, 24 insertions(+), 28 deletions(-) diff --git a/src/Transport/Sending/MessageDispatcher.cs b/src/Transport/Sending/MessageDispatcher.cs index b0b97e39..9b426c25 100644 --- a/src/Transport/Sending/MessageDispatcher.cs +++ b/src/Transport/Sending/MessageDispatcher.cs @@ -120,6 +120,7 @@ void AddBatchedOperationsTo(List dispatchTasks, var operations = destinationAndOperations.Value; var messagesToSend = new Queue(operations.Count); + var messagesThatCouldntBeSent = new List(operations.Count); foreach (var operation in operations) { var message = operation.Message.ToAzureServiceBusMessage(operation.Properties, azureServiceBusTransportTransaction?.IncomingQueuePartitionKey); @@ -128,25 +129,21 @@ void AddBatchedOperationsTo(List dispatchTasks, } // Accessing azureServiceBusTransaction.CommittableTransaction will initialize it if it isn't yet // doing the access as late as possible but still on the synchronous path. - dispatchTasks.Add(DispatchBatchForDestination(destination, azureServiceBusTransportTransaction?.ServiceBusClient, azureServiceBusTransportTransaction?.Transaction, messagesToSend, cancellationToken)); + dispatchTasks.Add(DispatchBatchForDestination(destination, azureServiceBusTransportTransaction?.ServiceBusClient, azureServiceBusTransportTransaction?.Transaction, messagesToSend, messagesThatCouldntBeSent, cancellationToken)); + + foreach (var message in messagesThatCouldntBeSent) + { + dispatchTasks.Add(DispatchForDestination(destination, azureServiceBusTransportTransaction?.ServiceBusClient, azureServiceBusTransportTransaction?.Transaction, message, cancellationToken)); + } } } - async Task DispatchBatchForDestination(string destination, ServiceBusClient? client, Transaction? transaction, Queue messagesToSend, CancellationToken cancellationToken) + async Task DispatchBatchForDestination(string destination, ServiceBusClient? client, Transaction? transaction, + Queue messagesToSend, List messagesThatCouldntBeSent, + CancellationToken cancellationToken) { - var messageCount = messagesToSend.Count; int batchCount = 0; var sender = messageSenderRegistry.GetMessageSender(destination, client); - // There are two limits for batching that unfortunately are not enforced over TryAdd. - // - // Limit 1: For transactional sends you cannot add more than 100 messages into the same batch. This limit - // is enforced when the batch is attempted to be sent. - // - // Limit 2: For non-transactional sends you cannot add more than 4500 messages into the same batch. This limit - // is enforced when the batch is attempted to be sent. There are plans to incorporate this limit into - // the TryAdd logic, see https://github.com/Azure/azure-sdk-for-net/issues/21451. Even though with all - // the headers we will probably never reach 4500 messages per batch this upper limit was added as a precaution - int maxItemsPerBatch = transaction == null ? 4500 : 100; while (messagesToSend.Count > 0) { using ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync(cancellationToken) @@ -158,28 +155,27 @@ async Task DispatchBatchForDestination(string destination, ServiceBusClient? cli logBuilder = new StringBuilder(); } - var peekedMessage = messagesToSend.Peek(); - if (messageBatch.TryAddMessage(peekedMessage)) + var dequeueMessage = messagesToSend.Dequeue(); + if (messageBatch.TryAddMessage(dequeueMessage)) { - var added = messagesToSend.Dequeue(); if (Log.IsDebugEnabled) { - added.ApplicationProperties.TryGetValue(Headers.MessageId, out var messageId); - logBuilder!.Append($"{messageId ?? added.MessageId},"); + dequeueMessage.ApplicationProperties.TryGetValue(Headers.MessageId, out var messageId); + logBuilder!.Append($"{messageId ?? dequeueMessage.MessageId},"); } } else { - peekedMessage.ApplicationProperties.TryGetValue(Headers.MessageId, out var messageId); - var message = - @$"Unable to add the message '#{messageCount - messagesToSend.Count}' with message id '{messageId ?? peekedMessage.MessageId}' to the the batch '#{batchCount}'. -The message may be too large, or the batch size has reached the maximum allowed messages per batch for the current tier selected for the namespace '{sender.FullyQualifiedNamespace}'. -To mitigate this problem, either reduce the message size by using the data bus, upgrade to a higher Service Bus tier, or increase the maximum message size. -If the maximum message size is increased, the endpoint must be restarted for the change to take effect."; - throw new ServiceBusException(message, ServiceBusFailureReason.MessageSizeExceeded); + // TBD Logging + messagesThatCouldntBeSent.Add(dequeueMessage); + continue; } - while (messagesToSend.Count > 0 && messageBatch.Count < maxItemsPerBatch && messageBatch.TryAddMessage(messagesToSend.Peek())) + // Trying to add as many messages as we can to the batch. TryAdd might return false due to the batch being full + // or the message being too large. In the case when the message is too large for the batch the next iteration + // will try to add it to a fresh batch and if that fails too we will add it to the list of messages that couldn't be sent + // trying to attempt to send them individually. + while (messagesToSend.Count > 0 && messageBatch.TryAddMessage(messagesToSend.Peek())) { var added = messagesToSend.Dequeue(); if (Log.IsDebugEnabled) @@ -228,12 +224,12 @@ void AddIsolatedOperationsTo(List dispatchTasks, { var message = operation.Message.ToAzureServiceBusMessage(operation.Properties, azureServiceBusTransportTransaction?.IncomingQueuePartitionKey); operation.ApplyCustomizationToOutgoingNativeMessage(message, transportTransaction, Log); - dispatchTasks.Add(DispatchIsolatedForDestination(destination, azureServiceBusTransportTransaction?.ServiceBusClient, noTransaction, message, cancellationToken)); + dispatchTasks.Add(DispatchForDestination(destination, azureServiceBusTransportTransaction?.ServiceBusClient, noTransaction, message, cancellationToken)); } } } - async Task DispatchIsolatedForDestination(string destination, ServiceBusClient? client, Transaction? transaction, ServiceBusMessage message, CancellationToken cancellationToken) + async Task DispatchForDestination(string destination, ServiceBusClient? client, Transaction? transaction, ServiceBusMessage message, CancellationToken cancellationToken) { var sender = messageSenderRegistry.GetMessageSender(destination, client); // Making sure we have a suppress scope around the sending From 4cfd768a25ed3befc26debe15b142e0827fb13ce Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Tue, 6 Aug 2024 12:30:18 +0200 Subject: [PATCH 3/6] Additional test scenario --- src/Tests/Sending/MessageDispatcherTests.cs | 37 +++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/src/Tests/Sending/MessageDispatcherTests.cs b/src/Tests/Sending/MessageDispatcherTests.cs index 0c4f3d81..86a0bc7c 100644 --- a/src/Tests/Sending/MessageDispatcherTests.cs +++ b/src/Tests/Sending/MessageDispatcherTests.cs @@ -485,6 +485,43 @@ public async Task Should_fallback_to_individual_sends_when_messages_cannot_be_ad Assert.That(defaultSender.IndividuallySentMessages, Has.Count.EqualTo(5)); } + [Test] + public async Task Should_fallback_to_individual_send_when_a_message_cannot_be_added_to_a_batch_but_batch_all_others() + { + var defaultClient = new FakeServiceBusClient(); + var defaultSender = new FakeSender(); + defaultClient.Senders["SomeDestination"] = defaultSender; + + defaultSender.TryAdd = msg => + { + return (string)msg.ApplicationProperties["Number"] switch + { + "4" or "7" => false, + _ => true, + }; + }; + + var dispatcher = new MessageDispatcher(new MessageSenderRegistry(defaultClient), "sometopic"); + + var operations = new List(5); + for (int i = 0; i < 10; i++) + { + operations.Add(new TransportOperation(new OutgoingMessage($"SomeId{i}", + new Dictionary { { "Number", i.ToString() } }, + ReadOnlyMemory.Empty), + new UnicastAddressTag("SomeDestination"), + [], + DispatchConsistency.Default)); + } + + var azureServiceBusTransaction = new AzureServiceBusTransportTransaction(); + + await dispatcher.Dispatch(new TransportOperations(operations.ToArray()), azureServiceBusTransaction.TransportTransaction); + + Assert.That(defaultSender.BatchSentMessages, Has.Count.EqualTo(3)); + Assert.That(defaultSender.IndividuallySentMessages, Has.Count.EqualTo(2)); + } + [Test] public async Task Should_use_default_connection_information_when_existing_service_bus_transaction_has_none() { From b46a7d0938d2161433d41dc76fcad7c0e5bd30d4 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Tue, 6 Aug 2024 12:35:40 +0200 Subject: [PATCH 4/6] Logging --- src/Transport/Sending/MessageDispatcher.cs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/Transport/Sending/MessageDispatcher.cs b/src/Transport/Sending/MessageDispatcher.cs index 9b426c25..54692844 100644 --- a/src/Transport/Sending/MessageDispatcher.cs +++ b/src/Transport/Sending/MessageDispatcher.cs @@ -120,7 +120,9 @@ void AddBatchedOperationsTo(List dispatchTasks, var operations = destinationAndOperations.Value; var messagesToSend = new Queue(operations.Count); - var messagesThatCouldntBeSent = new List(operations.Count); + // We assume the majority of the messages will be batched and only a few will be sent individually + // and therefore it is OK in those rare cases for the list to grow. + var messagesThatCouldntBeSent = new List(0); foreach (var operation in operations) { var message = operation.Message.ToAzureServiceBusMessage(operation.Properties, azureServiceBusTransportTransaction?.IncomingQueuePartitionKey); @@ -166,7 +168,11 @@ async Task DispatchBatchForDestination(string destination, ServiceBusClient? cli } else { - // TBD Logging + if (Log.IsDebugEnabled) + { + dequeueMessage.ApplicationProperties.TryGetValue(Headers.MessageId, out var messageId); + Log.Debug($"Message '{messageId ?? dequeueMessage.MessageId}' is too large for the batch '{batchCount}' and will be sent individually to destination {destination}."); + } messagesThatCouldntBeSent.Add(dequeueMessage); continue; } From 4e83aee45fa5061774c774088fb0417a623dc223 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Tue, 6 Aug 2024 14:48:42 +0200 Subject: [PATCH 5/6] Rename local --- src/Transport/Sending/MessageDispatcher.cs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/Transport/Sending/MessageDispatcher.cs b/src/Transport/Sending/MessageDispatcher.cs index 54692844..e9fd80e5 100644 --- a/src/Transport/Sending/MessageDispatcher.cs +++ b/src/Transport/Sending/MessageDispatcher.cs @@ -122,7 +122,7 @@ void AddBatchedOperationsTo(List dispatchTasks, var messagesToSend = new Queue(operations.Count); // We assume the majority of the messages will be batched and only a few will be sent individually // and therefore it is OK in those rare cases for the list to grow. - var messagesThatCouldntBeSent = new List(0); + var messagesTooLargeToBeBatched = new List(0); foreach (var operation in operations) { var message = operation.Message.ToAzureServiceBusMessage(operation.Properties, azureServiceBusTransportTransaction?.IncomingQueuePartitionKey); @@ -131,9 +131,9 @@ void AddBatchedOperationsTo(List dispatchTasks, } // Accessing azureServiceBusTransaction.CommittableTransaction will initialize it if it isn't yet // doing the access as late as possible but still on the synchronous path. - dispatchTasks.Add(DispatchBatchForDestination(destination, azureServiceBusTransportTransaction?.ServiceBusClient, azureServiceBusTransportTransaction?.Transaction, messagesToSend, messagesThatCouldntBeSent, cancellationToken)); + dispatchTasks.Add(DispatchBatchForDestination(destination, azureServiceBusTransportTransaction?.ServiceBusClient, azureServiceBusTransportTransaction?.Transaction, messagesToSend, messagesTooLargeToBeBatched, cancellationToken)); - foreach (var message in messagesThatCouldntBeSent) + foreach (var message in messagesTooLargeToBeBatched) { dispatchTasks.Add(DispatchForDestination(destination, azureServiceBusTransportTransaction?.ServiceBusClient, azureServiceBusTransportTransaction?.Transaction, message, cancellationToken)); } @@ -141,7 +141,7 @@ void AddBatchedOperationsTo(List dispatchTasks, } async Task DispatchBatchForDestination(string destination, ServiceBusClient? client, Transaction? transaction, - Queue messagesToSend, List messagesThatCouldntBeSent, + Queue messagesToSend, List messagesTooLargeToBeBatched, CancellationToken cancellationToken) { int batchCount = 0; @@ -173,7 +173,7 @@ async Task DispatchBatchForDestination(string destination, ServiceBusClient? cli dequeueMessage.ApplicationProperties.TryGetValue(Headers.MessageId, out var messageId); Log.Debug($"Message '{messageId ?? dequeueMessage.MessageId}' is too large for the batch '{batchCount}' and will be sent individually to destination {destination}."); } - messagesThatCouldntBeSent.Add(dequeueMessage); + messagesTooLargeToBeBatched.Add(dequeueMessage); continue; } From 5d81c7c679044aacaba882ceda1caba4cbd1ebd3 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Wed, 7 Aug 2024 10:50:16 +0200 Subject: [PATCH 6/6] Adding another comment to explain things even better --- src/Transport/Sending/MessageDispatcher.cs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/Transport/Sending/MessageDispatcher.cs b/src/Transport/Sending/MessageDispatcher.cs index e9fd80e5..ff2abf17 100644 --- a/src/Transport/Sending/MessageDispatcher.cs +++ b/src/Transport/Sending/MessageDispatcher.cs @@ -158,6 +158,9 @@ async Task DispatchBatchForDestination(string destination, ServiceBusClient? cli } var dequeueMessage = messagesToSend.Dequeue(); + // In this case the batch is fresh and doesn't have any messages yet. If TryAdd returns false + // we know the message can never be added to any batch and therefore we collect it to be sent + // individually. if (messageBatch.TryAddMessage(dequeueMessage)) { if (Log.IsDebugEnabled)