Skip to content

Commit

Permalink
Update documents after any product remove on it
Browse files Browse the repository at this point in the history
  • Loading branch information
lruozzi9 committed May 2, 2024
1 parent fc619ac commit 382afac
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 8 deletions.
10 changes: 10 additions & 0 deletions config/services/message_handler.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Symfony\Component\DependencyInjection\Loader\Configurator;

use Webgriffe\SyliusElasticsearchPlugin\MessageHandler\CreateIndexHandler;
use Webgriffe\SyliusElasticsearchPlugin\MessageHandler\RemoveDocumentIfExistsHandler;
use Webgriffe\SyliusElasticsearchPlugin\MessageHandler\UpsertDocumentHandler;

return static function (ContainerConfigurator $containerConfigurator) {
Expand All @@ -27,4 +28,13 @@
])
->tag('messenger.message_handler')
;

$services->set('webgriffe.sylius_elasticsearch_plugin.message_handler.remove_document_if_exists', RemoveDocumentIfExistsHandler::class)
->args([
service('sylius.repository.channel'),
service('webgriffe.sylius_elasticsearch_plugin.provider.document_type'),
service('webgriffe.sylius_elasticsearch_plugin.index_manager.elasticsearch'),
])
->tag('messenger.message_handler')
;
};
4 changes: 3 additions & 1 deletion src/Client/ElasticsearchClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ public function bulk(string $indexName, array $actions): Generator
$action->getAction()->value => $headerPayload,
];

$params['body'][] = $action->getPayload();
if ($action->getPayload() !== null) {
$params['body'][] = $action->getPayload();
}

// Every 250 actions stop and send the bulk request
if ($count % 250 === 0) {
Expand Down
4 changes: 2 additions & 2 deletions src/Client/ValueObject/BulkAction.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
public function __construct(
private Action $action,
private string $index,
private array $payload,
private ?array $payload = null,
private ?string $type = null,
private null|string|int $id = null,
) {
Expand All @@ -32,7 +32,7 @@ public function getType(): ?string
return $this->type;
}

public function getPayload(): array
public function getPayload(): ?array
{
return $this->payload;
}
Expand Down
4 changes: 2 additions & 2 deletions src/EventSubscriber/ProductEventSubscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public static function getSubscribedEvents(): array
return [
'sylius.product.post_create' => 'onProductPostCreateOrUpdate',
'sylius.product.post_update' => 'onProductPostCreateOrUpdate',
'sylius.product.post_delete' => 'onProductPostDelete',
'sylius.product.pre_delete' => 'onProductPreDelete',
];
}

Expand Down Expand Up @@ -74,7 +74,7 @@ public function onProductPostCreateOrUpdate(GenericEvent $event): void
}
}

public function onProductPostDelete(GenericEvent $event): void
public function onProductPreDelete(GenericEvent $event): void
{
$product = $event->getSubject();
if (!$product instanceof ProductInterface) {
Expand Down
22 changes: 21 additions & 1 deletion src/IndexManager/ElasticsearchIndexManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public function create(ChannelInterface $channel, DocumentTypeInterface $documen
$this->client->removeIndexes($indexesToRemoveWildcard, [$indexName]);
}

public function upsertDocument(
public function upsertDocuments(
ChannelInterface $channel,
DocumentTypeInterface $documentType,
string|int ...$identifiers,
Expand All @@ -77,4 +77,24 @@ public function upsertDocument(
}
yield Message::createMessage(sprintf('Updated %d documents in alias "%s".', $countBulkActions, $aliasName));
}

public function removeDocuments(
ChannelInterface $channel,
DocumentTypeInterface $documentType,
int|string ...$identifiers,
): Generator {
$aliasName = $this->indexNameGenerator->generateAlias($channel, $documentType);

$bulkActions = [];
foreach ($identifiers as $identifier) {
$bulkActions[] = new BulkAction(Action::DELETE, $aliasName, null, null, $identifier);
}

$countBulkActions = count($bulkActions);
yield Message::createMessage(sprintf('Removed 0/%d documents.', $countBulkActions));
foreach ($this->client->bulk($aliasName, $bulkActions) as $documentsIndexed) {
yield Message::createMessage(sprintf('Removed %d/%d documents.', $documentsIndexed, $countBulkActions));
}
yield Message::createMessage(sprintf('Removed %d documents in alias "%s".', $countBulkActions, $aliasName));
}
}
11 changes: 10 additions & 1 deletion src/IndexManager/IndexManagerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,18 @@ public function create(ChannelInterface $channel, DocumentTypeInterface $documen
/**
* @return Generator<array-key, MessageInterface>
*/
public function upsertDocument(
public function upsertDocuments(
ChannelInterface $channel,
DocumentTypeInterface $documentType,
string|int ...$identifiers,
): Generator;

/**
* @return Generator<array-key, MessageInterface>
*/
public function removeDocuments(
ChannelInterface $channel,
DocumentTypeInterface $documentType,
int|string ...$identifiers,
): Generator;
}
41 changes: 41 additions & 0 deletions src/MessageHandler/RemoveDocumentIfExistsHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php

declare(strict_types=1);

namespace Webgriffe\SyliusElasticsearchPlugin\MessageHandler;

use InvalidArgumentException;
use Sylius\Component\Channel\Repository\ChannelRepositoryInterface;
use Sylius\Component\Core\Model\ChannelInterface;
use Webgriffe\SyliusElasticsearchPlugin\IndexManager\IndexManagerInterface;
use Webgriffe\SyliusElasticsearchPlugin\Message\RemoveDocumentIfExists;
use Webgriffe\SyliusElasticsearchPlugin\Provider\DocumentTypeProviderInterface;

final readonly class RemoveDocumentIfExistsHandler
{
public function __construct(
private ChannelRepositoryInterface $channelRepository,
private DocumentTypeProviderInterface $documentTypeProvider,
private IndexManagerInterface $indexManager,
) {
}

/**
* @psalm-suppress UnusedForeachValue
*/
public function __invoke(RemoveDocumentIfExists $message): void
{
$channel = $this->channelRepository->find($message->getChannelId());
if (!$channel instanceof ChannelInterface) {
throw new InvalidArgumentException(sprintf(
'Channel with id "%s" does not exist.',
$message->getChannelId(),
));
}

$documentType = $this->documentTypeProvider->getDocumentType($message->getDocumentTypeCode());

foreach ($this->indexManager->removeDocuments($channel, $documentType, $message->getDocumentIdentifier()) as $outputMessage) {
}
}
}
2 changes: 1 addition & 1 deletion src/MessageHandler/UpsertDocumentHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public function __invoke(UpsertDocument $message): void

$documentType = $this->documentTypeProvider->getDocumentType($message->getDocumentTypeCode());

foreach ($this->indexManager->upsertDocument($channel, $documentType, $message->getDocumentIdentifier()) as $outputMessage) {
foreach ($this->indexManager->upsertDocuments($channel, $documentType, $message->getDocumentIdentifier()) as $outputMessage) {
}
}
}

0 comments on commit 382afac

Please sign in to comment.