Skip to content

Commit

Permalink
Test for parallel batching of messages
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremydmiller committed Jan 15, 2024
1 parent c9e29dd commit 47dcb38
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,11 @@ public async Task publishing()
opts.UseAmazonSqsTransport();

opts.PublishMessage<Message1>()
.ToSqsQueue("outbound1");
.ToSqsQueue("outbound1")

// Increase the outgoing message throughput, but at the cost
// of strict ordering
.MessageBatchMaxDegreeOfParallelism(Environment.ProcessorCount);


opts.PublishMessage<Message2>()
Expand Down
20 changes: 19 additions & 1 deletion src/Transports/AWS/Wolverine.AmazonSqs.Tests/send_and_receive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public async Task InitializeAsync()

opts.ListenToSqsQueue("send_and_receive");

opts.PublishAllMessages().ToSqsQueue("send_and_receive");
opts.PublishAllMessages().ToSqsQueue("send_and_receive").MessageBatchMaxDegreeOfParallelism(5);
}).StartAsync();
}

Expand All @@ -41,6 +41,24 @@ public async Task send_and_receive_a_single_message()
session.Received.SingleMessage<SqsMessage>()
.Name.ShouldBe(message.Name);
}

[Fact]
public async Task send_and_receive_many_messages()
{
Func<IMessageBus, Task> sending = async bus =>
{
for (int i = 0; i < 100; i++)
{
await bus.PublishAsync(new SqsMessage(Guid.NewGuid().ToString()));
}
};

await _host.TrackActivity()
.IncludeExternalTransports()
.Timeout(5.Minutes())
.ExecuteAndWaitAsync(sending);

}
}

public record SqsMessage(string Name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ internal AmazonSqsQueue(string queueName, AmazonSqsTransport parent) : base(new
EndpointName = queueName;

Configuration = new CreateQueueRequest(QueueName);

MessageBatchSize = 10;
}

/// <summary>
Expand Down
3 changes: 2 additions & 1 deletion src/Wolverine/Configuration/SubscriberConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public T MessageBatchSize(int batchSize)

/// <summary>
/// For endpoints that send messages in batches, this governs the maximum number
/// of concurrent outgoing batches
/// of concurrent outgoing batches. The default is 1 to ensure message order, but increase this
/// number to improve outgoing throughput
/// </summary>
public T MessageBatchMaxDegreeOfParallelism(int batchMaxDegreeOfParallelism)
{
Expand Down

0 comments on commit 47dcb38

Please sign in to comment.