Uses a SQL Server varbinary to store attachments for messages.
Two settings are required as part of the default usage:
- A connection factory that returns an open instance of a SqlConnection. Note that any Exception that occurs during opening the connection should be handled by the factory.
- A default time to keep for attachments.
configuration.EnableAttachments(
fileShare: "networkSharePath",
timeToKeep: _ => TimeSpan.FromDays(7));
configuration.EnableAttachments(
connectionFactory: async cancel =>
{
var connection = new SqlConnection(connectionString);
try
{
await connection.OpenAsync(cancel).ConfigureAwait(false);
return connection;
}
catch
{
await connection.DisposeAsync();
throw;
}
},
timeToKeep: _ => TimeSpan.FromDays(7));
Extract out the connection factory to a helper method
async Task<SqlConnection> OpenConnection(Cancel cancel)
{
var connection = new SqlConnection(connectionString);
try
{
await connection.OpenAsync(cancel).ConfigureAwait(false);
return connection;
}
catch
{
await connection.DisposeAsync();
throw;
}
}
Also uses the NServiceBus.Attachments.Sql.TimeToKeep.Default
method for attachment cleanup.
This usage results in the following:
configuration.EnableAttachments(
fileShare: "networkSharePath",
timeToKeep: TimeToKeep.Default);
configuration.EnableAttachments(
connectionFactory: OpenConnection,
timeToKeep: TimeToKeep.Default);
Attachments can leverage the ambient SQL connectivity from either the transport and/or the persister.
If both UseSynchronizedStorageSessionConnectivity
and UseTransportConnectivity
are defined, the SynchronizedStorageSession
will be used first, followed by the TransportTransaction
.
To use the ambient SynchronizedStorageSession persister:
var attachments = configuration.EnableAttachments(
OpenConnection,
TimeToKeep.Default);
attachments.UseSynchronizedStorageSessionConnectivity();
This approach attempts to use the SynchronizedStorageSession using the following steps:
- For the current context attempt to retrieve an instance of
SynchronizedStorageSession
. If noSynchronizedStorageSession
exists, don't continue and fall back to the SqlConnection retrieved by theconnectionFactory
. - Attempt to retrieve a property named 'Transaction' that is a SqlTransaction from the
SynchronizedStorageSession
. If it exists, use it for all SQL operations in the current pipeline. - Attempt to retrieve a property named 'Connection' that is a SqlConnection from the
SynchronizedStorageSession
. If it exists, use it for all SQL operations in the current pipeline.
The properties are retrieved using reflection since there is no API in NServiceBus to access SynchronizedStorageSession data via type.
To use the ambient transport transaction:
var attachments = configuration.EnableAttachments(
OpenConnection,
TimeToKeep.Default);
attachments.UseTransportConnectivity();
This approach attempts to use the transport transaction using the following steps:
- For the current context, attempt to retrieve an instance of
TransportTransaction
. If noTransportTransaction
exists, don't continue and fall back to using the SqlConnection retrieved by theconnectionFactory
. - Attempt to retrieve an instance of Transaction from the
TransportTransaction
. If it exists, use it in SqlConnection.EnlistTransaction with an instance of SqlConnection retrieved by theconnectionFactory
. Then use that SqlConnection for all SQL operations in the current pipeline. - Attempt to retrieve an instance of SqlTransaction from the
TransportTransaction
. If it exists, use it for all SQL operations in the current pipeline. - Attempt to retrieve an instance of SqlConnection from the
TransportTransaction
. If it exists, use it for all SQL operations in the current pipeline. - Any attachments associated with a message send will be deleted after message processing.
To streamline development the attachment installer is, by default, executed at endpoint startup, in the same manner as all other installers.
configuration.EnableInstallers();
var attachments = configuration.EnableAttachments(
connectionFactory: OpenConnection,
timeToKeep: TimeToKeep.Default);
NOTE: Note that this is also a valid approach for higher level environments.
However in higher level environment scenarios, where standard installers are being run, but the SQL attachment installation has been executed as part of a deployment, it may be necessary to explicitly disable the attachment installer executing while leaving standard installers enabled.
configuration.EnableInstallers();
var attachments = configuration.EnableAttachments(
connectionFactory: OpenConnection,
timeToKeep: TimeToKeep.Default);
attachments.DisableInstaller();
The default table name and schema is dbo.MessageAttachments
. It can be changed with the following:
var attachments = configuration.EnableAttachments(
connectionFactory: OpenConnection,
timeToKeep: TimeToKeep.Default);
attachments.UseTable(new("CustomAttachmentsTableName", "dbo"));
Attachment cleanup is enabled by default. It can be disabled using the following:
var attachments = configuration.EnableAttachments(
fileShare: "networkSharePath",
timeToKeep: TimeToKeep.Default);
attachments.DisableCleanupTask();
var attachments = configuration.EnableAttachments(
connectionFactory: OpenConnection,
timeToKeep: TimeToKeep.Default);
attachments.DisableCleanupTask();
When the cleanup task runs it uses the Expiry
column to determine if a given attachment should be deleted. This column is populated when an attachment is written. When adding an attachment to an outgoing message, all methods accept an optional parameter timeToKeep
of the type GetTimeToKeep
. GetTimeToKeep
is defined as:
public delegate TimeSpan GetTimeToKeep(TimeSpan? messageTimeToBeReceived);
Where messageTimeToBeReceived
is value of TimeToBeReceived. If no timeToKeep
parameter for a specific attachment is defined then the endpoint level timeToKeep
is used.
The result of timeToKeep
is then added to the current date and persisted to the Expiry
column.
The method TimeToKeep.Default
provides a recommended default for for attachment lifetime calculation:
- If TimeToBeReceived is defined then keep attachment for twice that time.
- Else; keep for 10 days.
Approaches to using attachments for an outgoing message.
Note: Stream.Dispose is called after the data has been persisted. As such it is not necessary for any code using attachments to perform this cleanup.
While the below examples illustrate adding an attachment to SendOptions
, equivalent operations can be performed on PublishOptions
and ReplyOptions
The recommended approach for adding an attachment is by providing a delegate that constructs the stream. The execution of this delegate is then deferred until later in the outgoing pipeline, when the instance of the stream is required to be persisted.
There are both async and sync variants.
class HandlerFactory :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var sendOptions = new SendOptions();
var attachments = sendOptions.Attachments();
attachments.Add(
name: "attachment1",
streamFactory: () => File.OpenRead("FilePath.txt"));
return context.Send(new OtherMessage(), sendOptions);
}
}
class HandlerFactory :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var sendOptions = new SendOptions();
var attachments = sendOptions.Attachments();
attachments.Add(
name: "attachment1",
streamFactory: () => File.OpenRead("FilePath.txt"));
return context.Send(new OtherMessage(), sendOptions);
}
}
class HandlerFactoryAsync :
IHandleMessages<MyMessage>
{
static HttpClient httpClient = new();
public Task Handle(MyMessage message, HandlerContext context)
{
var sendOptions = new SendOptions();
var attachments = sendOptions.Attachments();
attachments.Add(
name: "attachment1",
streamFactory: () => httpClient.GetStreamAsync("theUrl"));
return context.Send(new OtherMessage(), sendOptions);
}
}
class HandlerFactoryAsync :
IHandleMessages<MyMessage>
{
static HttpClient httpClient = new();
public Task Handle(MyMessage message, HandlerContext context)
{
var sendOptions = new SendOptions();
var attachments = sendOptions.Attachments();
attachments.Add(
name: "attachment1",
streamFactory: () => httpClient.GetStreamAsync("theUrl"));
return context.Send(new OtherMessage(), sendOptions);
}
}
In some cases an instance of a stream is already available in scope and as such it can be passed directly.
class HandlerInstance :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var sendOptions = new SendOptions();
var attachments = sendOptions.Attachments();
var stream = File.OpenRead("FilePath.txt");
attachments.Add(
name: "attachment1",
stream: stream,
cleanup: () => File.Delete("FilePath.txt"));
return context.Send(new OtherMessage(), sendOptions);
}
}
class HandlerInstance :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var sendOptions = new SendOptions();
var attachments = sendOptions.Attachments();
var stream = File.OpenRead("FilePath.txt");
attachments.Add(
name: "attachment1",
stream: stream,
cleanup: () => File.Delete("FilePath.txt"));
return context.Send(new OtherMessage(), sendOptions);
}
}
Approaches to using attachments for the current incoming message.
Processes an attachment with a specific name.
class HandlerProcessStream :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
return attachments.ProcessStream(
name: "attachment1",
action: async(stream, token) =>
{
// Use the attachment stream. in this example copy to a file
await using var fileToCopyTo = File.Create("FilePath.txt");
await stream.CopyToAsync(fileToCopyTo, token);
},
context.CancellationToken);
}
}
class HandlerProcessStream :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
return attachments.ProcessStream(
name: "attachment1",
action: async (stream, token) =>
{
// Use the attachment stream. in this example copy to a file
await using var fileToCopyTo = File.Create("FilePath.txt");
await stream.CopyToAsync(fileToCopyTo, token);
},
context.CancellationToken);
}
}
Processes all attachments.
class HandlerProcessStreams :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
return attachments.ProcessStreams(
action: async (stream, cancel) =>
{
// Use the attachment stream. in this example copy to a file
await using var file = File.Create($"{stream.Name}.txt");
await stream.CopyToAsync(file, cancel);
},
context.CancellationToken);
}
}
class HandlerProcessStreams :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
return attachments.ProcessStreams(
action: async (stream, cancel) =>
{
// Use the attachment stream. in this example copy to a file
await using var file = File.Create($"{stream.Name}.txt");
await stream.CopyToAsync(file, cancel);
},
context.CancellationToken);
}
}
Copy an attachment with a specific name to another stream.
class HandlerCopyTo :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
await using var fileToCopyTo = File.Create("FilePath.txt");
await attachments.CopyTo("attachment1", fileToCopyTo, context.CancellationToken);
}
}
class HandlerCopyTo :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
await using var fileToCopyTo = File.Create("FilePath.txt");
await attachments.CopyTo("attachment1", fileToCopyTo, context.CancellationToken);
}
}
Get a stream for an attachment with a specific name.
class HandlerGetStream :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
await using var stream = await attachments.GetStream("attachment1", context.CancellationToken);
// Use the attachment stream. in this example copy to a file
await using var fileToCopyTo = File.Create("FilePath.txt");
await stream.CopyToAsync(fileToCopyTo, context.CancellationToken);
}
}
class HandlerGetStream :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
await using var attachment = await attachments.GetStream("attachment1", context.CancellationToken);
// Use the attachment stream. in this example copy to a file
await using var fileToCopyTo = File.Create("FilePath.txt");
await attachment.CopyToAsync(fileToCopyTo, context.CancellationToken);
}
}
Get a byte array for an attachment with a specific name.
WARNING: This should only be used the data size is know to be small as it causes the full size of the attachment to be allocated in memory.
class HandlerGetBytes :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
var bytes = await attachments.GetBytes("attachment1", context.CancellationToken);
// use the byte array
}
}
class HandlerGetBytes :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
var bytes = await attachments.GetBytes("attachment1", context.CancellationToken);
// use the byte array
}
}
All of the above examples have companion methods that are suffixed with ForMessage
. These methods allow a handler or saga to read any attachments as long as the message id for that attachment is known. For example processing all attachments for a specific message could be done as follows
class HandlerProcessStreamsForMessage :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
return attachments.ProcessStreamsForMessage(
messageId: "theMessageId",
action: async (stream, cancel) =>
{
// Use the attachment stream. in this example copy to a file
await using var toCopyTo = File.Create($"{stream.Name}.txt");
await stream.CopyToAsync(toCopyTo, cancel);
},
context.CancellationToken);
}
}
class HandlerProcessStreamsForMessage :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
return attachments.ProcessStreamsForMessage(
messageId: "theMessageId",
action: async (stream, cancel) =>
{
// Use the attachment stream. in this example copy to a file
await using var file = File.Create($"{stream.Name}.txt");
await stream.CopyToAsync(file, cancel);
},
context.CancellationToken);
}
}
This can be helpful in a saga that is operating in a Scatter-Gather mode. So instead of storing all binaries inside the saga persister, the saga can instead store the message ids and then, at a latter point in time, access those attachments.
The below examples also use the NServiceBus.Testing extension.
public class Handler :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var options = new SendOptions();
var attachments = options.Attachments();
attachments.Add("theName", () => File.OpenRead("aFilePath"));
return context.Send(new OtherMessage(), options);
}
}
public class Handler :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var options = new SendOptions();
var attachments = options.Attachments();
attachments.Add("theName", () => File.OpenRead("aFilePath"));
return context.Send(new OtherMessage(), options);
}
}
[Fact]
public async Task TestOutgoingAttachments()
{
//Arrange
var context = new RecordingHandlerContext();
var handler = new Handler();
//Act
await handler.Handle(new(), context);
// Assert
var sentMessage = context.Sent.Single();
var attachments = sentMessage.Options.Attachments();
var attachment = attachments.Items.Single();
Assert.Contains("theName", attachment.Name);
Assert.True(attachments.HasPendingAttachments);
}
[Fact]
public async Task TestOutgoingAttachments()
{
//Arrange
var context = new RecordingHandlerContext();
var handler = new Handler();
//Act
await handler.Handle(new(), context);
// Assert
var sentMessage = context.Sent.Single();
var attachments = sentMessage.Options.Attachments();
var attachment = attachments.Items.Single();
Assert.Contains("theName", attachment.Name);
Assert.True(attachments.HasPendingAttachments);
}
To mock or verify incoming attachments is it necessary to inject a instance of IMessageAttachments
into the current IMessageHandlerContext
. This can be done using the MockAttachmentHelper.InjectAttachmentsInstance()
extension method which exists in the NServiceBus.Attachments.Testing
namespace.
var context = new RecordingHandlerContext();
var mockMessageAttachments = new MyMessageAttachments();
context.InjectAttachmentsInstance(mockMessageAttachments);
var context = new RecordingHandlerContext();
var mockMessageAttachments = new MyMessageAttachments();
context.InjectAttachmentsInstance(mockMessageAttachments);
The implementation of IMessageHandlerContext
can be a custom coded mock or constructed using any of the popular mocking/assertion frameworks.
There is a default implementation of IMessageAttachments
named MockMessageAttachments
. This implementation stubs out all methods. All members are virtual so it can be used as simplified base class for custom mocks.
public class CustomMockMessageAttachments :
MockMessageAttachments
{
public override Task<AttachmentBytes> GetBytes(Cancel cancel = default)
{
GetBytesWasCalled = true;
return Task.FromResult(new AttachmentBytes("name", [5]));
}
public bool GetBytesWasCalled { get; private set; }
}
public class CustomMockMessageAttachments :
MockMessageAttachments
{
public override Task<AttachmentBytes> GetBytes(Cancel cancel = default)
{
GetBytesWasCalled = true;
return Task.FromResult(new AttachmentBytes("name", [5]));
}
public bool GetBytesWasCalled { get; private set; }
}
Putting these parts together allows a handler, using incoming attachments, to be tested.
public class Handler :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, HandlerContext context)
{
var attachment = context.Attachments();
var bytes = await attachment.GetBytes(context.CancellationToken);
}
}
public class Handler :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, HandlerContext context)
{
var attachment = context.Attachments();
var bytes = await attachment.GetBytes(context.CancellationToken);
}
}
[Fact]
public async Task TestIncomingAttachment()
{
//Arrange
var context = new RecordingHandlerContext();
var handler = new Handler();
var mockMessageAttachments = new CustomMockMessageAttachments();
context.InjectAttachmentsInstance(mockMessageAttachments);
//Act
await handler.Handle(new(), context);
//Assert
Assert.True(mockMessageAttachments.GetBytesWasCalled);
}
[Fact]
public async Task TestIncomingAttachment()
{
//Arrange
var context = new RecordingHandlerContext();
var handler = new Handler();
var mockMessageAttachments = new CustomMockMessageAttachments();
context.InjectAttachmentsInstance(mockMessageAttachments);
//Act
await handler.Handle(new(), context);
//Assert
Assert.True(mockMessageAttachments.GetBytesWasCalled);
}