Skip to content

Commit

Permalink
Merge pull request #283 from Particular/LinqAdieu
Browse files Browse the repository at this point in the history
MessageDispatcher no longer uses LINQ and the allocations that come with it
  • Loading branch information
tmasternak authored Jul 29, 2016
2 parents 9ae5b2b + 041b51c commit 1b70e13
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ class FakeTableBasedQueueDispatcher : IQueueDispatcher
{
public List<string> DispatchedMessageIds = new List<string>();

public Task DispatchAsNonIsolated(List<MessageWithAddress> operations, ContextBag context)
public Task DispatchAsNonIsolated(HashSet<MessageWithAddress> operations, ContextBag context)
{
DispatchedMessageIds.AddRange(operations.Select(x => x.Message.MessageId));
return Task.FromResult(0);
}

public Task DispatchAsIsolated(List<MessageWithAddress> operations)
public Task DispatchAsIsolated(HashSet<MessageWithAddress> operations)
{
DispatchedMessageIds.AddRange(operations.Select(x => x.Message.MessageId));
return Task.FromResult(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public LegacyTableBasedQueueDispatcher(LegacySqlConnectionFactory connectionFact
this.connectionFactory = connectionFactory;
}

public virtual async Task DispatchAsNonIsolated(List<MessageWithAddress> operations, ContextBag context)
public virtual async Task DispatchAsNonIsolated(HashSet<MessageWithAddress> operations, ContextBag context)
{
//If dispatch is not isolated then either TS has been created by the receive operation or needs to be created here.
using (var scope = new TransactionScope(TransactionScopeOption.Required, TransactionScopeAsyncFlowOption.Enabled))
Expand All @@ -29,7 +29,7 @@ public virtual async Task DispatchAsNonIsolated(List<MessageWithAddress> operati
}
}

public virtual async Task DispatchAsIsolated(List<MessageWithAddress> operations)
public virtual async Task DispatchAsIsolated(HashSet<MessageWithAddress> operations)
{
using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew, TransactionScopeAsyncFlowOption.Enabled))
{
Expand Down
4 changes: 2 additions & 2 deletions src/NServiceBus.SqlServer/Sending/IQueueDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ namespace NServiceBus.Transport.SQLServer

interface IQueueDispatcher
{
Task DispatchAsNonIsolated(List<MessageWithAddress> operations, ContextBag context);
Task DispatchAsNonIsolated(HashSet<MessageWithAddress> operations, ContextBag context);

Task DispatchAsIsolated(List<MessageWithAddress> operations);
Task DispatchAsIsolated(HashSet<MessageWithAddress> operations);
}
}
22 changes: 10 additions & 12 deletions src/NServiceBus.SqlServer/Sending/MessageDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Extensibility;
using Transports;
Expand All @@ -22,18 +21,17 @@ public async Task Dispatch(TransportOperations operations, ContextBag context)
await Dispatch(operations, ops => dispatcher.DispatchAsNonIsolated(ops, context), DispatchConsistency.Default).ConfigureAwait(false);
}

Task Dispatch(TransportOperations operations, Func<List<MessageWithAddress>, Task> dispatchMethod, DispatchConsistency dispatchConsistency)
Task Dispatch(TransportOperations operations, Func<HashSet<MessageWithAddress>, Task> dispatchMethod, DispatchConsistency dispatchConsistency)
{
var isolatedOperations = operations.UnicastTransportOperations.Where(o => o.RequiredDispatchConsistency == dispatchConsistency);
var deduplicatedIsolatedOperations = DeduplicateBasedOnMessageIdAndQueueAddress(isolatedOperations).ToList();
return dispatchMethod(deduplicatedIsolatedOperations);
}

IEnumerable<MessageWithAddress> DeduplicateBasedOnMessageIdAndQueueAddress(IEnumerable<UnicastTransportOperation> isolatedConsistencyOperations)
{
return isolatedConsistencyOperations
.Select(o => new MessageWithAddress(o.Message, addressParser.Parse(o.Destination)))
.Distinct(OperationByMessageIdAndQueueAddressComparer);
var deduplicatedOperations = new HashSet<MessageWithAddress>(OperationByMessageIdAndQueueAddressComparer);
foreach (var operation in operations.UnicastTransportOperations)
{
if (operation.RequiredDispatchConsistency == dispatchConsistency)
{
deduplicatedOperations.Add(new MessageWithAddress(operation.Message, addressParser.Parse(operation.Destination)));
}
}
return dispatchMethod(deduplicatedOperations);
}

IQueueDispatcher dispatcher;
Expand Down
10 changes: 5 additions & 5 deletions src/NServiceBus.SqlServer/Sending/TableBasedQueueDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public TableBasedQueueDispatcher(SqlConnectionFactory connectionFactory)
this.connectionFactory = connectionFactory;
}

public async Task DispatchAsIsolated(List<MessageWithAddress> operations)
public async Task DispatchAsIsolated(HashSet<MessageWithAddress> operations)
{
if (operations.Count == 0)
{
Expand All @@ -31,7 +31,7 @@ public async Task DispatchAsIsolated(List<MessageWithAddress> operations)
}
}

public async Task DispatchAsNonIsolated(List<MessageWithAddress> operations, ContextBag context)
public async Task DispatchAsNonIsolated(HashSet<MessageWithAddress> operations, ContextBag context)
{
if (operations.Count == 0)
{
Expand All @@ -50,7 +50,7 @@ public async Task DispatchAsNonIsolated(List<MessageWithAddress> operations, Con
}


async Task DispatchOperationsWithNewConnectionAndTransaction(List<MessageWithAddress> operations)
async Task DispatchOperationsWithNewConnectionAndTransaction(HashSet<MessageWithAddress> operations)
{
using (var connection = await connectionFactory.OpenNewConnection().ConfigureAwait(false))
{
Expand All @@ -68,7 +68,7 @@ async Task DispatchOperationsWithNewConnectionAndTransaction(List<MessageWithAdd
}
}

async Task DispatchUsingReceiveTransaction(TransportTransaction transportTransaction, List<MessageWithAddress> operations)
async Task DispatchUsingReceiveTransaction(TransportTransaction transportTransaction, HashSet<MessageWithAddress> operations)
{
SqlConnection sqlTransportConnection;
SqlTransaction sqlTransportTransaction;
Expand All @@ -91,7 +91,7 @@ async Task DispatchUsingReceiveTransaction(TransportTransaction transportTransac
}
}

static async Task Send(List<MessageWithAddress> operations, SqlConnection connection, SqlTransaction transaction)
static async Task Send(HashSet<MessageWithAddress> operations, SqlConnection connection, SqlTransaction transaction)
{
foreach (var operation in operations)
{
Expand Down

0 comments on commit 1b70e13

Please sign in to comment.