Skip to content

Commit

Permalink
Fix test for corrupted headers
Browse files Browse the repository at this point in the history
Remove test for corrupted body as non-SQL Server transport specific
  • Loading branch information
SzymonPobiega committed Oct 18, 2019
1 parent 8e22abd commit 2150f6f
Showing 1 changed file with 4 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,11 @@ await Scenario.Define<Context>()
})
.Done(c =>
{
var firstErrorEntry = c.Logs.FirstOrDefault(l => l.Level == LogLevel.Error);
var firstErrorEntry = c.Logs.FirstOrDefault(l => l.Level == LogLevel.Error
&& l.Message != null
&& l.Message.StartsWith("Error receiving message. Probable message metadata corruption. Moving to error queue."));
c.FirstErrorEntry = firstErrorEntry?.Message;
return firstErrorEntry != null;
return c.FirstErrorEntry != null;
})
.Run();

Expand All @@ -86,72 +88,6 @@ await Scenario.Define<Context>()
}
}

#if NET452
[TestCase(TransportTransactionMode.TransactionScope)]
#endif
[TestCase(TransportTransactionMode.SendsAtomicWithReceive)]
[TestCase(TransportTransactionMode.ReceiveOnly)]
[TestCase(TransportTransactionMode.None)]
public async Task Should_move_it_to_the_error_queue_when_body_corrupted(TransportTransactionMode txMode)
{
PurgeQueues(connectionString);
try
{
await Scenario.Define<Context>()
.WithEndpoint<Endpoint>(b =>
{
b.DoNotFailOnErrorMessages();
b.CustomConfig(c =>
{
c.UseTransport<SqlServerTransport>()
.Transactions(txMode);
});
b.When(async (bus, c) =>
{
var endpoint = Conventions.EndpointNamingConvention(typeof(Endpoint));

using (var conn = new SqlConnection(connectionString))
{
await conn.OpenAsync();
var command = conn.CreateCommand();
var guid = Guid.NewGuid();
command.CommandText =
$@"INSERT INTO [dbo].[{endpoint}] ([Id],[CorrelationId],[ReplyToAddress],[Recoverable],[Expires],[Headers],[Body])
VALUES (@Id,@CorrelationId,@ReplyToAddress,@Recoverable,@Expires,@Headers,@Body)";
command.Parameters.Add("Id", SqlDbType.UniqueIdentifier).Value = guid;
command.Parameters.Add("CorrelationId", SqlDbType.UniqueIdentifier).Value = guid;
command.Parameters.Add("ReplyToAddress", SqlDbType.VarChar).Value = "";
command.Parameters.Add("Recoverable", SqlDbType.Bit).Value = true;
command.Parameters.Add("Expires", SqlDbType.DateTime).Value = DateTime.Now.AddHours(1);
command.Parameters.Add("Headers", SqlDbType.VarChar).Value = "{\"NServiceBus.MessageIntent\":\"Send\",\"NServiceBus.CorrelationId\":\"{guid}\"," +
"\"NServiceBus.OriginatingMachine\":\"SCHMETTERLING\",\"NServiceBus.OriginatingEndpoint\":\"ReceivingWithNativeMultiQueueTransaction.Endpoint\"," +
"\"$.diagnostics.originating.hostid\":\"3c921610dc4d3ddde4347cd65addcb9f\"," +
"\"NServiceBus.ReplyToAddress\":\"ReceivingWithNativeMultiQueueTransaction.Endpoint@dbo\"," +
"\"NServiceBus.ContentType\":\"text/xml\"," +
"\"NServiceBus.EnclosedMessageTypes\":\"NServiceBus.AcceptanceTests.Tx.When_receiving_with_native_multi_queue_transaction + When_receiving_with_native_multi_queue_transaction.MessageHandledEvent, NServiceBus.SqlServer.AcceptanceTests, Version = 0.0.0.0, Culture = neutral, PublicKeyToken = null\"" +
",\"NServiceBus.RelatedTo\":\"{guid}\"," +
"\"NServiceBus.ConversationId\":\"{guid}\"," +
"\"NServiceBus.MessageId\":\"{guid}\"," +
"\"NServiceBus.Version\":\"6.0.0\"," +
"\"NServiceBus.TimeSent\":\"2015-12-16 11:53:22:631646 Z\"}";

command.Parameters.Add("Body", SqlDbType.VarBinary).Value = Encoding.UTF8.GetBytes("body corrupted");

await command.ExecuteNonQueryAsync();
}
});
})
.Done(c => c.Logs.Any(l => l.Level == LogLevel.Error))
.Run();

Assert.True(MessageExistsInErrorQueue(connectionString), "The message should have been moved to the error queue");
}
finally
{
PurgeQueues(connectionString);
}
}

static void PurgeQueues(string connectionString)
{
using (var conn = new SqlConnection(connectionString))
Expand Down

0 comments on commit 2150f6f

Please sign in to comment.