Skip to content

Commit

Permalink
Merge pull request #2350 from Particular/backport-process-raw-message…
Browse files Browse the repository at this point in the history
…s-to-6-2

Backport the processing of raw messages to 6.2
  • Loading branch information
SzymonPobiega authored Nov 13, 2023
2 parents 0c920c8 + 3d68559 commit 4c12f41
Show file tree
Hide file tree
Showing 6 changed files with 491 additions and 227 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.5.0" />
<PackageReference Include="NUnit" Version="3.13.3" />
<PackageReference Include="NUnit3TestAdapter" Version="4.4.2" />
<PackageReference Include="NServiceBus.Newtonsoft.Json" Version="3.0.0" />
</ItemGroup>

<ItemGroup>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Amazon.SQS.Model;
using Configuration.AdvancedExtensibility;
using EndpointTemplates;
using global::Newtonsoft.Json;
using NUnit.Framework;
using Transport.SQS.Tests;

Expand All @@ -35,41 +36,27 @@ await NativeEndpoint.SendTo<Receiver>(new Dictionary<string, MessageAttributeVal
}

[Test]
public async Task Should_fail_when_messagetypefullname_not_present()
public async Task Should_be_processed_if_type_metadata_is_embedded()
{
using var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;
try
{
await Scenario.Define<Context>()
.WithEndpoint<Receiver>(c =>
var context = await Scenario.Define<Context>()
.WithEndpoint<Receiver>(c => c.CustomConfig(cfg =>
{
var serialization = cfg.UseSerialization<NewtonsoftJsonSerializer>();
serialization.Settings(new JsonSerializerSettings()
{
c.CustomConfig((cfg, ctx) =>
{
ctx.ErrorQueueAddress = cfg.GetSettings().ErrorQueueAddress();
});
c.When(async (session, ctx) =>
{
await NativeEndpoint.SendTo<Receiver>(new Dictionary<string, MessageAttributeValue>
{
// unfortunately only the message id attribute is preserved when moving to the poison queue
{
Headers.MessageId, new MessageAttributeValue {DataType = "String", StringValue = ctx.TestRunId.ToString()}
}
}, MessageToSend);
_ = NativeEndpoint.ConsumePoisonQueue(ctx.TestRunId, ctx.ErrorQueueAddress, _ =>
{
ctx.MessageMovedToPoisonQueue = true;
}, cancellationToken);
}).DoNotFailOnErrorMessages();
})
.Done(c => c.MessageMovedToPoisonQueue)
.Run();
}
finally
{
cancellationTokenSource.Cancel();
}
TypeNameHandling = TypeNameHandling.Auto
});
}).When(async _ =>
{
await NativeEndpoint.SendTo<Receiver>(new Dictionary<string, MessageAttributeValue>
{
{"SomeAttribute", new MessageAttributeValue {DataType = "String", StringValue = "SomeValue"}}
}, @$"{{ ""$type"": ""{typeof(Message).AssemblyQualifiedName}"", ""ThisIsTheMessage"": ""Hello!""}}");
}))
.Done(c => c.MessageReceived != null)
.Run();

Assert.AreEqual("Hello!", context.MessageReceived);
}

[Test]
Expand Down
Loading

0 comments on commit 4c12f41

Please sign in to comment.