Skip to content

II. Getting Started

Gowri edited this page Dec 9, 2021 · 20 revisions

Key Concepts

  1. Async Events: are events that are dispatched from Magento when they are triggered
  2. Subscribers: is something that listens to one or several async events.

Defining an Async Event

Async events are defined in etc/async_events.xml. The definition only provides an acknowledgement to Magento that such an async event exists. It has to be dispatched by you at a place you find suitable.

    <async_event name="sales.order.created">
        <service class="Magento\Sales\Api\OrderRepositoryInterface" method="get"/>
    </async_event>
  1. async_event
    • name - A unique name given to the asynchronous event
  2. service
    • class - The class or interface that defines the handler.
    • method - The method which is executed and the return value is published to subscribers.

Dispatching Async Events

Events are dispatched by simply publishing to the EVENT_QUEUE using the Magento\Framework\MessageQueue\PublisherInterface, however it needs to follow a certain structure.

The Magento\Framework\MessageQueue\PublisherInterface::publish takes in two arguments

public function publish($topicName, $data);

The first argument $topicName SHOULD be a string that's defined by the constant \Aligent\AsyncEvents\Helper\QueueMetadataInterface::EVENT_QUEUE

The second argument $data follows a specific structure. It should contain an array of two strings.

  1. The first string specifies what async_event to dispatch. (anything that's defined in async_events.xml)
  2. The second string SHOUD be a JSON serialised string. The serialised string should contain the named arguments of the service method that resolves the async event.

For example, if your service method was Magento\Sales\Api\OrderRepositoryInterface::get which takes in the following inputs

/**
 * @param int $id The order ID.
 * @return \Magento\Sales\Api\Data\OrderInterface Order interface.
 */
public function get($id);

your $data array should look like

$arguments = ['id' => $orderId];
$data = ['sales.order.created', $this->json->serialize($arguments)]

This is likely to change in a future major version where a AsyncEventMessageInterface would be passed instead of an array of two strings.

Example:

    public function execute(Observer $observer): void
    {
        /** @var Order $object */
        $object = $observer->getEvent()->getData('order');

        $arguments = ['id' => $object->getId()];
        $data = ['sales.order.created', $this->json->serialize($arguments)];

        $this->publisher->publish(QueueMetadataInterface::EVENT_QUEUE, $data);
    }

Notifiers

Each subscription might want to be handled a little differently. This is useful when each subscriber has their own preferred way of receiving notifications.

Your notifiers must implement the Aligent\AsyncEvents\Service\AsyncEvent\NotifierInterface

HTTP Example

<?php

namespace Aligent\AsyncEvents\Service\AsyncEvent;

use Aligent\AsyncEvents\Api\Data\AsyncEventInterface;
use Aligent\AsyncEvents\Helper\NotifierResult;
use GuzzleHttp\Client;
use GuzzleHttp\Exception\RequestException;
use Magento\Framework\Encryption\EncryptorInterface;
use Magento\Framework\Serialize\Serializer\Json;

/**
 * Class HttpNotifier
 *
 * This notifier just serves as a bare minimum and example implementation reference. You would want to create your own
 * factory and then derive your own implementations. However, if this just serves your purpose well, then you might as
 * well as use this instead.
 *
 */
class HttpNotifier implements NotifierInterface
{
    /**
     * Hash algorithm to use to sign.
     */
    const HASHING_ALGORITHM = 'sha256';

    /**
     * @var Client
     */
    private $client;

    /**
     * @var Json
     */
    private $json;

    /**
     * @var EncryptorInterface
     */
    private $encryptor;

    public function __construct(
        Client $client,
        Json $json,
        EncryptorInterface $encryptor
    ) {
        $this->client = $client;
        $this->json = $json;
        $this->encryptor = $encryptor;
    }

    /**
     * {@inheritDoc}
     */
    public function notify(AsyncEventInterface $asyncEvent, array $data): NotifierResult
    {
        $body = $data;

        // Sign the payload that the client can verify.
        $headers = [
            'x-magento-signature' => hash_hmac(
                self::HASHING_ALGORITHM,
                $this->json->serialize($body),
                $this->encryptor->decrypt($asyncEvent->getVerificationToken())
            )
        ];

        $notifierResult = new NotifierResult();
        $notifierResult->setSubscriptionId($asyncEvent->getSubscriptionId());

        try {
            $response = $this->client->post(
                $asyncEvent->getRecipientUrl(),
                [
                    'headers' => $headers,
                    'json' => $body,
                    'timeout' => 15,
                    'connect_timeout' => 5
                ]
            );

            $notifierResult->setSuccess(
                $response->getStatusCode() >= 200
                && $response->getStatusCode() < 300
            );

            $notifierResult->setResponseData(
                $this->json->serialize(
                    $response->getBody()->getContents()
                )
            );
        } catch (RequestException $exception) {
            /**
             * Catch a RequestException so we cover even the network layer exceptions which might sometimes
             * not have a response.
             */
            $notifierResult->setSuccess(false);

            if ($exception->hasResponse()) {
                $response = $exception->getResponse();
                $responseContent = $response->getBody()->getContents();
                $exceptionMessage = !empty($responseContent) ? $responseContent : $response->getReasonPhrase();

                $notifierResult->setResponseData(
                    $this->json->serialize(
                        $exceptionMessage
                    )
                );
            } else {
                $notifierResult->setResponseData(
                    $exception->getMessage()
                );
            }
        }

        return $notifierResult;
    }
}

EventBridge Example

This is using the Magento EventBridge Notifier

<?php

namespace Aligent\EventBridge\Service;

use Aligent\EventBridge\Model\Config as EventBridgeConfig;
use Aligent\AsyncEvents\Service\AsyncEvent\NotifierInterface;
use Aligent\AsyncEvents\Api\Data\AsyncEventInterface;
use Aligent\AsyncEvents\Helper\NotifierResult;
use Magento\Framework\Encryption\EncryptorInterface;
use Magento\Framework\Serialize\Serializer\Json;
use Aws\EventBridge\EventBridgeClient;
use Psr\Log\LoggerInterface;

/**
 * Class EventBridgeNotifier
 *
 * A notifier for relaying events into Amazon EventBridge.
 *
 */
class EventBridgeNotifier implements NotifierInterface
{
    /**
     * @var Json
     */
    private $json;

    /**
     * @var EncryptorInterface
     */
    private $encryptor;

    /**
     * @var LoggerInterface
     */
    private $logger;

    /**
     * @var EventBridgeConfig
     */
    private $config;

    /**
     * @var EventBridgeClient
     */
    private $eventBridgeClient;

    public function __construct(
        Json $json,
        EncryptorInterface $encryptor,
        LoggerInterface $logger,
        EventBridgeConfig $config
    ) {
        $this->json = $json;
        $this->encryptor = $encryptor;
        $this->logger = $logger;
        $this->config = $config;

        $this->eventBridgeClient = new EventBridgeClient([
            'version' => '2015-10-07',
            'region' => $this->config->getAWSRegion(),
            'credentials' => [
                'key' => $this->config->getAWSKeyId(),
                'secret' => $this->encryptor->decrypt($this->config->getAWSSecretKey())
            ]
        ]);
    }

    /**
     * {@inheritDoc}
     */
    public function notify(AsyncEventInterface $asyncEvent, array $data): NotifierResult
    {
        $notifierResult = new NotifierResult();
        $notifierResult->setSubscriptionId($asyncEvent->getSubscriptionId());

        try {
            $eventEntry['EventBusName'] = $this->config->getEventBridgeBus();
            $result = $this->eventBridgeClient->putEvents([
                 'Entries' => [
                      [
                           'Source' => $this->config->getEventBridgeSource(),
                           'Detail' => $this->json->serialize($data),
                           'DetailType' => $asyncEvent->getEventName(),
                           'Resources' => [],
                           'Time' => time()
                      ]
                 ]
            ]);

            // Some event failures, which don't fail the entire request but do cause a failure of the
            // event submission result in failed results in the result->entries.
            // https://docs.aws.amazon.com/aws-sdk-php/v3/api/api-eventbridge-2015-10-07.html#putevents
            if ( isset($result['FailedEntryCount']) && $result['FailedEntryCount'] > 0) {
                 $notifierResult->setSuccess(false);

                 // As we are only ever submitting one event at a time, assume that only one result
                 // can be returned.
                 $entry = $result['Entries'][0];
                 if ($entry !== null) {
                      $notifierResult->setResponseData(
                           $this->json->serialize($entry)
                      );
                 }

            } else {
                 $notifierResult->setSuccess(true);

                 $notifierResult->setResponseData(
                      $this->json->serialize($result)
                 );
            }
        } catch (\Exception $exception) {
            $this->logger->error($exception);

            $notifierResult->setSuccess(false);

            $notifierResult->setResponseData(
                $this->json->serialize($exception)
            );
        }

        return $notifierResult;
    }
}

Creating Subscriptions

Finally, to receive notifications of async events, you have to create subscribers. The module has a REST API to create and manage subscriptions.

{
    "asyncEvent": {
        "event_name": "sales.order.created",
        "recipient_url": "https://example.com/order_created",
        "verification_token": "fv38u07Wdh$R@mRd",
        "metadata": "http"
    }
}