From 5f7c0ecec62883b116e3385064dd40f3b211d7fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Tue, 16 May 2023 20:15:57 +0200 Subject: [PATCH] Use correct index position when not reading the message body as a stream (#1179) (#1180) * Add acceptance test to reproduce issue * Better test name * Fix the bug * Use strings to avoid breking the net472 build with the old client * Exclude tests on the old client * Include .NET Framework * Cross compile also other legacy test project * Proper range --------- Co-authored-by: danielmarbach --- .../NServiceBus.SqlServer.csproj | 6 +- .../When_using_column_encrypted_connection.cs | 88 +++++++++++++++++++ .../NServiceBus.Transport.SqlServer.csproj | 4 +- .../Queuing/MessageRow.cs | 14 +-- 4 files changed, 100 insertions(+), 12 deletions(-) create mode 100644 src/NServiceBus.Transport.SqlServer.AcceptanceTests/When_using_column_encrypted_connection.cs diff --git a/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj b/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj index eff5deaf0..4b618dd34 100644 --- a/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj +++ b/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj @@ -19,9 +19,9 @@ - - - + + + diff --git a/src/NServiceBus.Transport.SqlServer.AcceptanceTests/When_using_column_encrypted_connection.cs b/src/NServiceBus.Transport.SqlServer.AcceptanceTests/When_using_column_encrypted_connection.cs new file mode 100644 index 000000000..3e4c857f0 --- /dev/null +++ b/src/NServiceBus.Transport.SqlServer.AcceptanceTests/When_using_column_encrypted_connection.cs @@ -0,0 +1,88 @@ +namespace NServiceBus.Transport.SqlServer.AcceptanceTests +{ + using System; +#if SYSTEMDATASQLCLIENT + using System.Data.SqlClient; +#else + using Microsoft.Data.SqlClient; +#endif + using System.Threading.Tasks; + using AcceptanceTesting; + using AcceptanceTesting.Customization; + using NServiceBus.AcceptanceTests; + using NUnit.Framework; + + public class When_using_column_encrypted_connection : NServiceBusAcceptanceTest + { + [Test] + public async Task Should_work() + { +#if SYSTEMDATASQLCLIENT && NET + Assert.Ignore("System.Data.SqlClient doesn't support this setting on .NET (works on .NET Framework)"); +#endif + var ctx = await Scenario.Define() + .WithEndpoint(b => b.When((bus, c) => bus.SendLocal(new Message()))) + .Done(c => c.MessageReceived) + .Run(); + + Assert.True(ctx.MessageReceived, "Message should be properly received"); + } + + static string GetConnectionString() => + Environment.GetEnvironmentVariable("SqlServerTransportConnectionString") ?? @"Data Source=.\SQLEXPRESS;Initial Catalog=nservicebus;Integrated Security=True"; + + public class Endpoint : EndpointConfigurationBuilder + { + public Endpoint() + { + var transport = new SqlServerTransport(async cancellationToken => + { + var connectionString = GetConnectionString(); + + if (!connectionString.EndsWith(";")) + { + connectionString += ";"; + } + + connectionString += "Column Encryption Setting=enabled"; + + var connection = new SqlConnection(connectionString); + + await connection.OpenAsync(cancellationToken); + + return connection; + }); + + EndpointSetup(new CustomizedServer(transport), (c, sd) => + { + c.OverridePublicReturnAddress($"{Conventions.EndpointNamingConvention(typeof(Endpoint))}@dbo@nservicebus"); + }); + } + + class Handler : IHandleMessages + { + readonly Context scenarioContext; + public Handler(Context scenarioContext) + { + this.scenarioContext = scenarioContext; + } + + public Task Handle(Message message, IMessageHandlerContext context) + { + scenarioContext.MessageReceived = true; + + return Task.FromResult(0); + } + } + } + + public class Context : ScenarioContext + { + public bool MessageReceived { get; set; } + } + + public class Message : IMessage + { + } + } +} diff --git a/src/NServiceBus.Transport.SqlServer/NServiceBus.Transport.SqlServer.csproj b/src/NServiceBus.Transport.SqlServer/NServiceBus.Transport.SqlServer.csproj index e31793f97..e2b94f1a8 100644 --- a/src/NServiceBus.Transport.SqlServer/NServiceBus.Transport.SqlServer.csproj +++ b/src/NServiceBus.Transport.SqlServer/NServiceBus.Transport.SqlServer.csproj @@ -15,8 +15,8 @@ - - + + diff --git a/src/NServiceBus.Transport.SqlServer/Queuing/MessageRow.cs b/src/NServiceBus.Transport.SqlServer/Queuing/MessageRow.cs index 5b225e392..0502c709f 100644 --- a/src/NServiceBus.Transport.SqlServer/Queuing/MessageRow.cs +++ b/src/NServiceBus.Transport.SqlServer/Queuing/MessageRow.cs @@ -50,7 +50,7 @@ static async Task ReadRow(SqlDataReader dataReader, bool isStreamSup id = await dataReader.GetFieldValueAsync(0, cancellationToken).ConfigureAwait(false), expired = await dataReader.GetFieldValueAsync(1, cancellationToken).ConfigureAwait(false) == 1, headers = await GetHeaders(dataReader, 2, cancellationToken).ConfigureAwait(false), - bodyBytes = isStreamSupported ? await GetBody(dataReader, 3, cancellationToken).ConfigureAwait(false) : await GetNonStreamBody(dataReader, 5, cancellationToken).ConfigureAwait(false) + bodyBytes = await GetBody(dataReader, 3, isStreamSupported, cancellationToken).ConfigureAwait(false) }; } @@ -80,8 +80,13 @@ static async Task GetHeaders(SqlDataReader dataReader, int headersIndex, } } - static async Task GetBody(SqlDataReader dataReader, int bodyIndex, CancellationToken cancellationToken) + static async Task GetBody(SqlDataReader dataReader, int bodyIndex, bool isStreamSupported, CancellationToken cancellationToken) { + if (!isStreamSupported) + { + return (byte[])dataReader[bodyIndex]; + } + // Null values will be returned as an empty (zero bytes) Stream. using (var outStream = new MemoryStream()) using (var stream = dataReader.GetStream(bodyIndex)) @@ -92,11 +97,6 @@ static async Task GetBody(SqlDataReader dataReader, int bodyIndex, Cance } } - static Task GetNonStreamBody(SqlDataReader dataReader, int bodyIndex, CancellationToken cancellationToken) - { - return Task.FromResult((byte[])dataReader[bodyIndex]); - } - void AddParameter(SqlCommand command, string name, SqlDbType type, object value) { command.Parameters.Add(name, type).Value = value ?? DBNull.Value;