Skip to content

Commit

Permalink
Merge pull request #78 from ericfernance/add-support-for-fifo-sns
Browse files Browse the repository at this point in the history
Adds SnsFifoStamp class based of the Symfony base stamp for SQS and a…
  • Loading branch information
mnapoli authored Nov 24, 2023
2 parents c3f00f1 + 669bba8 commit 229e7bc
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 1 deletion.
26 changes: 26 additions & 0 deletions src/Service/Sns/SnsFifoStamp.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

namespace Bref\Symfony\Messenger\Service\Sns;

use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;

class SnsFifoStamp implements NonSendableStampInterface {
private ?string $messageGroupId;
private ?string $messageDeduplicationId;

public function __construct(string $messageGroupId = null, string $messageDeduplicationId = null)
{
$this->messageGroupId = $messageGroupId;
$this->messageDeduplicationId = $messageDeduplicationId;
}

public function getMessageGroupId(): ?string
{
return $this->messageGroupId;
}

public function getMessageDeduplicationId(): ?string
{
return $this->messageDeduplicationId;
}
}
16 changes: 15 additions & 1 deletion src/Service/Sns/SnsTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,21 @@ public function send(Envelope $envelope): Envelope
'Message' => $encodedMessage['body'],
'TopicArn' => $this->topic,
];

if (str_contains($this->topic, ".fifo")) {
$stamps = $envelope->all();
$dedupeStamp = $stamps[SnsFifoStamp::class][0] ?? false;
if (!$dedupeStamp) {
throw new Exception("SnsFifoStamp required for fifo topic");
}
$messageGroupId = $dedupeStamp->getMessageGroupId() ?? false;
$messageDeDuplicationId = $dedupeStamp->getMessageDeduplicationId() ?? false;
if ($messageDeDuplicationId) {
$arguments['MessageDeduplicationId'] = $messageDeDuplicationId;
}
if ($messageGroupId) {
$arguments['MessageGroupId'] = $messageGroupId;
}
}
try {
$result = $this->sns->publish($arguments);
$messageId = $result->getMessageId();
Expand Down
66 changes: 66 additions & 0 deletions tests/Functional/Service/Sns/SnsTransportTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
use AsyncAws\Core\Test\ResultMockFactory;
use AsyncAws\Sns\Result\PublishResponse;
use AsyncAws\Sns\SnsClient;
use Bref\Symfony\Messenger\Service\Sns\SnsFifoStamp;
use Bref\Symfony\Messenger\Service\Sns\SnsTransport;
use Bref\Symfony\Messenger\Service\Sns\SnsTransportFactory;
use Bref\Symfony\Messenger\Test\Functional\BaseFunctionalTest;
use Bref\Symfony\Messenger\Test\Resources\TestMessage\TestMessage;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

class SnsTransportTest extends BaseFunctionalTest
{
Expand Down Expand Up @@ -53,4 +56,67 @@ public function test send message(): void
$bus = $this->container->get(MessageBusInterface::class);
$bus->dispatch(new TestMessage('hello'));
}

public function testRejectsMessageWhenQueueIsFifoWithoutStamp()
{
$snsClient = $this->getMockBuilder(SnsClient::class)->disableOriginalConstructor()->getMock();
$serializer = $this->container->get(SerializerInterface::class);
$snsTransport = new SnsTransport($snsClient, $serializer, "arn:aws:sns:us-east-1:1234567890:test.fifo'"); // fifo suffix designates fifo queue
$msg = new TestMessage("hello");
$envelope = new Envelope($msg);
$this->expectExceptionMessage("SnsFifoStamp required for fifo topic");
$snsTransport->send($envelope);
}
public function testAcceptsMessageWhenQueueIsFifoWithStamp(){
$snsClient = $this->getMockBuilder(SnsClient::class)->disableOriginalConstructor()->getMock();
$snsClient->expects($this->once())->method("publish")->willReturn(ResultMockFactory::create(PublishResponse::class, ['MessageId' => 4711]));
$serializer = $this->container->get(SerializerInterface::class);
$snsTransport = new SnsTransport($snsClient, $serializer, "arn:aws:sns:us-east-1:1234567890:test.fifo'"); // fifo suffix designates fifo queue
$msg = new TestMessage("hello");
$envelope = new Envelope($msg, [new SnsFifoStamp("123","456")]);
$resp = $snsTransport->send($envelope);
$this->assertInstanceOf(Envelope::class, $resp);
}
public function testAttachingSnsFifoStampToMessageAppliesMessageGroupId(){
$snsClient = $this->getMockBuilder(SnsClient::class)->disableOriginalConstructor()->getMock();
$snsClient->expects($this->once())->method("publish")
->with($this->callback(function($params){
$this->assertEquals("123", $params["MessageGroupId"]);
return true;
}))
->willReturn(ResultMockFactory::create(PublishResponse::class, ['MessageId' => 4711]));
$serializer = $this->container->get(SerializerInterface::class);
$snsTransport = new SnsTransport($snsClient, $serializer, "arn:aws:sns:us-east-1:1234567890:test.fifo'"); // fifo suffix designates fifo queue
$msg = new TestMessage("hello");
$envelope = new Envelope($msg, [new SnsFifoStamp("123","456")]);
$resp = $snsTransport->send($envelope);
$this->assertInstanceOf(Envelope::class, $resp);
}
public function testAttachingSnsFifoStampToMessageAppliesMessageDeDeuplicatId(){
$snsClient = $this->getMockBuilder(SnsClient::class)->disableOriginalConstructor()->getMock();
$snsClient->expects($this->once())->method("publish")
->with($this->callback(function($params){
$this->assertEquals("456", $params["MessageDeduplicationId"]);
return true;
}))
->willReturn(ResultMockFactory::create(PublishResponse::class, ['MessageId' => 4711]));
$serializer = $this->container->get(SerializerInterface::class);
$snsTransport = new SnsTransport($snsClient, $serializer, "arn:aws:sns:us-east-1:1234567890:test.fifo'"); // fifo suffix designates fifo queue
$msg = new TestMessage("hello");
$envelope = new Envelope($msg, [new SnsFifoStamp("123","456")]);
$resp = $snsTransport->send($envelope);
$this->assertInstanceOf(Envelope::class, $resp);
}
public function testAttachingSnsFifoStampToMessageAllowsNullMessageGroupId(){
// in fifo queues message group id can be null when the de-dupe scope is the entire queue.
$snsClient = $this->getMockBuilder(SnsClient::class)->disableOriginalConstructor()->getMock();
$snsClient->expects($this->once())->method("publish")
->willReturn(ResultMockFactory::create(PublishResponse::class, ['MessageId' => 4711]));
$serializer = $this->container->get(SerializerInterface::class);
$snsTransport = new SnsTransport($snsClient, $serializer, "arn:aws:sns:us-east-1:1234567890:test.fifo'"); // fifo suffix designates fifo queue
$msg = new TestMessage("hello");
$envelope = new Envelope($msg, [new SnsFifoStamp(null,"456")]);
$resp = $snsTransport->send($envelope);
$this->assertInstanceOf(Envelope::class, $resp);
}
}

0 comments on commit 229e7bc

Please sign in to comment.