Skip to content

Commit

Permalink
Update documents after any product update on it
Browse files Browse the repository at this point in the history
  • Loading branch information
lruozzi9 committed May 2, 2024
1 parent 3f81ea7 commit fc619ac
Show file tree
Hide file tree
Showing 17 changed files with 395 additions and 13 deletions.
20 changes: 20 additions & 0 deletions config/services/event_subscriber.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

declare(strict_types=1);

namespace Symfony\Component\DependencyInjection\Loader\Configurator;

use Webgriffe\SyliusElasticsearchPlugin\EventSubscriber\ProductEventSubscriber;

return static function (ContainerConfigurator $containerConfigurator) {
$services = $containerConfigurator->services();

$services->set('webgriffe.sylius_elasticsearch_plugin.event_subscriber.product', ProductEventSubscriber::class)
->args([
service('webgriffe_sylius_elasticsearch_plugin.command_bus'),
service('logger'),
service('sylius.repository.channel'),
])
->tag('kernel.event_subscriber')
;
};
10 changes: 10 additions & 0 deletions config/services/message_handler.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Symfony\Component\DependencyInjection\Loader\Configurator;

use Webgriffe\SyliusElasticsearchPlugin\MessageHandler\CreateIndexHandler;
use Webgriffe\SyliusElasticsearchPlugin\MessageHandler\UpsertDocumentHandler;

return static function (ContainerConfigurator $containerConfigurator) {
$services = $containerConfigurator->services();
Expand All @@ -17,4 +18,13 @@
])
->tag('messenger.message_handler')
;

$services->set('webgriffe.sylius_elasticsearch_plugin.message_handler.upsert_document', UpsertDocumentHandler::class)
->args([
service('sylius.repository.channel'),
service('webgriffe.sylius_elasticsearch_plugin.provider.document_type'),
service('webgriffe.sylius_elasticsearch_plugin.index_manager.elasticsearch'),
])
->tag('messenger.message_handler')
;
};
3 changes: 3 additions & 0 deletions src/Client/ClientInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Webgriffe\SyliusElasticsearchPlugin\Client\Exception\CreateIndexException;
use Webgriffe\SyliusElasticsearchPlugin\Client\Exception\RemoveIndexesException;
use Webgriffe\SyliusElasticsearchPlugin\Client\Exception\SwitchAliasException;
use Webgriffe\SyliusElasticsearchPlugin\Client\ValueObject\BulkAction;

/**
* @psalm-type ESValuesAggregation = array{doc_count_error_upper_bound: int, sum_other_doc_count: int, buckets: array<int, array{key: string, doc_count: int}>}
Expand Down Expand Up @@ -43,6 +44,8 @@ public function switchAlias(string $aliasName, string $toIndexName): void;
public function removeIndexes(string $wildcard = null, array $skips = []): void;

/**
* @param BulkAction[] $actions
*
* @return Generator<array-key, int> Number of executed actions at each step
*
* @throws BulkException
Expand Down
16 changes: 11 additions & 5 deletions src/Client/ElasticsearchClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,23 @@ public function bulk(string $indexName, array $actions): Generator
$params = ['body' => []];

$count = $totalCount = 0;
/** @var array $action */
foreach ($actions as $action) {
++$count;
++$totalCount;
$headerPayload = [
'_index' => $action->getIndex(),
];
if ($action->getType() !== null) {
$headerPayload['_type'] = $action->getType();
}
if ($action->getId() !== null) {
$headerPayload['_id'] = $action->getId();
}
$params['body'][] = [
'index' => [
'_index' => $indexName,
],
$action->getAction()->value => $headerPayload,
];

$params['body'][] = $action;
$params['body'][] = $action->getPayload();

// Every 250 actions stop and send the bulk request
if ($count % 250 === 0) {
Expand Down
13 changes: 13 additions & 0 deletions src/Client/Enum/Action.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php

declare(strict_types=1);

namespace Webgriffe\SyliusElasticsearchPlugin\Client\Enum;

enum Action: string
{
case CREATE = 'create';
case DELETE = 'delete';
case INDEX = 'index';
case UPDATE = 'update';
}
44 changes: 44 additions & 0 deletions src/Client/ValueObject/BulkAction.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<?php

declare(strict_types=1);

namespace Webgriffe\SyliusElasticsearchPlugin\Client\ValueObject;

use Webgriffe\SyliusElasticsearchPlugin\Client\Enum\Action;

final readonly class BulkAction
{
public function __construct(
private Action $action,
private string $index,
private array $payload,
private ?string $type = null,
private null|string|int $id = null,
) {
}

public function getAction(): Action
{
return $this->action;
}

public function getIndex(): string
{
return $this->index;
}

public function getType(): ?string
{
return $this->type;
}

public function getPayload(): array
{
return $this->payload;
}

public function getId(): int|string|null
{
return $this->id;
}
}
12 changes: 12 additions & 0 deletions src/Doctrine/ORM/ProductDocumentTypeRepositoryTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,16 @@ public function findDocumentsToIndex(ChannelInterface $channel): array
->getResult()
;
}

public function findDocumentToIndex(string|int $identifier, ChannelInterface $channel): ?object
{
return $this->createQueryBuilder('p')
->where('p.id = :id')
->andWhere(':channel MEMBER OF p.channels')
->setParameter('id', $identifier)
->setParameter('channel', $channel)
->getQuery()
->getOneOrNullResult()
;
}
}
4 changes: 3 additions & 1 deletion src/DocumentType/DocumentTypeInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ interface DocumentTypeInterface
public function getCode(): string;

/**
* @return array<array-key, mixed>
* @return array<array-key, array>
*/
public function getDocuments(ChannelInterface $channel): array;

Expand All @@ -20,4 +20,6 @@ public function getMappings(): array;

/** @return array<string, array> */
public function getSettings(): array;

public function getDocument(string|int $identifier, ChannelInterface $channel): array;
}
20 changes: 19 additions & 1 deletion src/DocumentType/ProductDocumentType.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Webgriffe\SyliusElasticsearchPlugin\DocumentType;

use InvalidArgumentException;
use Sylius\Component\Core\Model\ChannelInterface;
use Sylius\Component\Core\Model\ProductInterface;
use Sylius\Component\Locale\Model\LocaleInterface;
Expand Down Expand Up @@ -44,15 +45,32 @@ public function getDocuments(ChannelInterface $channel): array
$documents = [];
/** @var ProductInterface $documentToIndex */
foreach ($this->documentTypeRepository->findDocumentsToIndex($channel) as $documentToIndex) {
$documents[] = $this->normalizer->normalize($documentToIndex, null, [
$result = $this->normalizer->normalize($documentToIndex, null, [
'type' => 'webgriffe_sylius_elasticsearch_plugin',
'channel' => $channel,
]);
Assert::isArray($result);
$documents[] = $result;
}

return $documents;
}

public function getDocument(string|int $identifier, ChannelInterface $channel): array
{
$product = $this->documentTypeRepository->findDocumentToIndex($identifier, $channel);
if ($product === null) {
throw new InvalidArgumentException(sprintf('Product with identifier "%s" not found.', $identifier));
}
$result = $this->normalizer->normalize($product, null, [
'type' => 'webgriffe_sylius_elasticsearch_plugin',
'channel' => $channel,
]);
Assert::isArray($result);

return $result;
}

public function getSettings(): array
{
$settings = [
Expand Down
106 changes: 106 additions & 0 deletions src/EventSubscriber/ProductEventSubscriber.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
<?php

declare(strict_types=1);

namespace Webgriffe\SyliusElasticsearchPlugin\EventSubscriber;

use Psr\Log\LoggerInterface;
use Sylius\Component\Channel\Repository\ChannelRepositoryInterface;
use Sylius\Component\Core\Model\ChannelInterface;
use Sylius\Component\Core\Model\ProductInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\EventDispatcher\GenericEvent;
use Symfony\Component\Messenger\MessageBusInterface;
use Throwable;
use Webgriffe\SyliusElasticsearchPlugin\DocumentType\ProductDocumentType;
use Webgriffe\SyliusElasticsearchPlugin\Message\RemoveDocumentIfExists;
use Webgriffe\SyliusElasticsearchPlugin\Message\UpsertDocument;

final readonly class ProductEventSubscriber implements EventSubscriberInterface
{
public function __construct(
private MessageBusInterface $messageBus,
private LoggerInterface $logger,
private ChannelRepositoryInterface $channelRepository,
) {
}

public static function getSubscribedEvents(): array
{
return [
'sylius.product.post_create' => 'onProductPostCreateOrUpdate',
'sylius.product.post_update' => 'onProductPostCreateOrUpdate',
'sylius.product.post_delete' => 'onProductPostDelete',
];
}

public function onProductPostCreateOrUpdate(GenericEvent $event): void
{
$product = $event->getSubject();
if (!$product instanceof ProductInterface) {
return;
}
$productId = $product->getId();
if (!is_int($productId) && !is_string($productId)) {
return;
}

try {
/** @var array<array-key, ChannelInterface> $allChannels */
$allChannels = $this->channelRepository->findAll();
$productChannels = $product->getChannels();
foreach ($allChannels as $channel) {
$channelId = $channel->getId();
if (!is_int($channelId) && !is_string($channelId)) {
continue;
}
if ($productChannels->contains($channel)) {
$this->messageBus->dispatch(new UpsertDocument(
$channelId,
ProductDocumentType::CODE,
$productId,
));

continue;
}
$this->messageBus->dispatch(new RemoveDocumentIfExists(
$channelId,
ProductDocumentType::CODE,
$productId,
));
}
} catch (Throwable $throwable) {
$this->logger->error($throwable->getMessage(), $throwable->getTrace());
}
}

public function onProductPostDelete(GenericEvent $event): void
{
$product = $event->getSubject();
if (!$product instanceof ProductInterface) {
return;
}
$productId = $product->getId();
if (!is_int($productId) && !is_string($productId)) {
return;
}

try {
/** @var array<array-key, ChannelInterface> $allChannels */
$allChannels = $this->channelRepository->findAll();
foreach ($allChannels as $channel) {
$channelId = $channel->getId();
if (!is_int($channelId) && !is_string($channelId)) {
continue;
}
$this->messageBus->dispatch(new RemoveDocumentIfExists(
$channelId,
ProductDocumentType::CODE,
$productId,
));
}
} catch (Throwable $throwable) {
$this->logger->error($throwable->getMessage(), $throwable->getTrace());
}
}
}
42 changes: 37 additions & 5 deletions src/IndexManager/ElasticsearchIndexManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
use Generator;
use Sylius\Component\Core\Model\ChannelInterface;
use Webgriffe\SyliusElasticsearchPlugin\Client\ClientInterface;
use Webgriffe\SyliusElasticsearchPlugin\Client\Enum\Action;
use Webgriffe\SyliusElasticsearchPlugin\Client\ValueObject\BulkAction;
use Webgriffe\SyliusElasticsearchPlugin\DocumentType\DocumentTypeInterface;
use Webgriffe\SyliusElasticsearchPlugin\Generator\IndexNameGeneratorInterface;
use Webgriffe\SyliusElasticsearchPlugin\IndexManager\Output\Message;
Expand All @@ -31,18 +33,48 @@ public function create(ChannelInterface $channel, DocumentTypeInterface $documen
// @TODO: speed up this step by introducing a batch size
yield Message::createMessage('Retrieving normalized documents to populate the index.');
$documents = $documentType->getDocuments($channel);
$bulkActions = [];
foreach ($documents as $document) {
$syliusId = null;
if (array_key_exists('sylius-id', $document)) {
/** @var string|int $syliusId */
$syliusId = $document['sylius-id'];
}
$bulkActions[] = new BulkAction(Action::CREATE, $indexName, $document, null, $syliusId);
}

$countDocuments = count($documents);
yield Message::createMessage(sprintf('Indexed 0/%d documents.', $countDocuments));
foreach ($this->client->bulk($indexName, $documents) as $documentsIndexed) {
yield Message::createMessage(sprintf('Indexed %d/%d documents.', $documentsIndexed, $countDocuments));
$countBulkActions = count($bulkActions);
yield Message::createMessage(sprintf('Indexed 0/%d documents.', $countBulkActions));
foreach ($this->client->bulk($indexName, $bulkActions) as $documentsIndexed) {
yield Message::createMessage(sprintf('Indexed %d/%d documents.', $documentsIndexed, $countBulkActions));
}
yield Message::createMessage(sprintf('Populated index "%s" with %d documents.', $indexName, $countDocuments));
yield Message::createMessage(sprintf('Populated index "%s" with %d documents.', $indexName, $countBulkActions));

yield Message::createMessage(sprintf('Switching alias "%s" to index "%s".', $aliasName, $indexName));
$this->client->switchAlias($aliasName, $indexName);

yield Message::createMessage(sprintf('Removing old indexes responding to wildcard "%s".', $indexesToRemoveWildcard));
$this->client->removeIndexes($indexesToRemoveWildcard, [$indexName]);
}

public function upsertDocument(
ChannelInterface $channel,
DocumentTypeInterface $documentType,
string|int ...$identifiers,
): Generator {
$aliasName = $this->indexNameGenerator->generateAlias($channel, $documentType);

yield Message::createMessage('Retrieving normalized document to update the index.');
$bulkActions = [];
foreach ($identifiers as $identifier) {
$bulkActions[] = new BulkAction(Action::INDEX, $aliasName, $documentType->getDocument($identifier, $channel), null, $identifier);
}

$countBulkActions = count($bulkActions);
yield Message::createMessage(sprintf('Indexed 0/%d documents.', $countBulkActions));
foreach ($this->client->bulk($aliasName, $bulkActions) as $documentsIndexed) {
yield Message::createMessage(sprintf('Indexed %d/%d documents.', $documentsIndexed, $countBulkActions));
}
yield Message::createMessage(sprintf('Updated %d documents in alias "%s".', $countBulkActions, $aliasName));
}
}
Loading

0 comments on commit fc619ac

Please sign in to comment.