diff --git a/config/services/message_handler.php b/config/services/message_handler.php index a073036..2ba4186 100644 --- a/config/services/message_handler.php +++ b/config/services/message_handler.php @@ -5,6 +5,7 @@ namespace Symfony\Component\DependencyInjection\Loader\Configurator; use Webgriffe\SyliusElasticsearchPlugin\MessageHandler\CreateIndexHandler; +use Webgriffe\SyliusElasticsearchPlugin\MessageHandler\RemoveDocumentIfExistsHandler; use Webgriffe\SyliusElasticsearchPlugin\MessageHandler\UpsertDocumentHandler; return static function (ContainerConfigurator $containerConfigurator) { @@ -27,4 +28,13 @@ ]) ->tag('messenger.message_handler') ; + + $services->set('webgriffe.sylius_elasticsearch_plugin.message_handler.remove_document_if_exists', RemoveDocumentIfExistsHandler::class) + ->args([ + service('sylius.repository.channel'), + service('webgriffe.sylius_elasticsearch_plugin.provider.document_type'), + service('webgriffe.sylius_elasticsearch_plugin.index_manager.elasticsearch'), + ]) + ->tag('messenger.message_handler') + ; }; diff --git a/src/Client/ElasticsearchClient.php b/src/Client/ElasticsearchClient.php index 8ab5cf6..2b853f2 100644 --- a/src/Client/ElasticsearchClient.php +++ b/src/Client/ElasticsearchClient.php @@ -89,7 +89,9 @@ public function bulk(string $indexName, array $actions): Generator $action->getAction()->value => $headerPayload, ]; - $params['body'][] = $action->getPayload(); + if ($action->getPayload() !== null) { + $params['body'][] = $action->getPayload(); + } // Every 250 actions stop and send the bulk request if ($count % 250 === 0) { diff --git a/src/Client/ValueObject/BulkAction.php b/src/Client/ValueObject/BulkAction.php index 8391ef4..dc509b2 100644 --- a/src/Client/ValueObject/BulkAction.php +++ b/src/Client/ValueObject/BulkAction.php @@ -11,7 +11,7 @@ public function __construct( private Action $action, private string $index, - private array $payload, + private ?array $payload = null, private ?string $type = null, private null|string|int $id = null, ) { @@ -32,7 +32,7 @@ public function getType(): ?string return $this->type; } - public function getPayload(): array + public function getPayload(): ?array { return $this->payload; } diff --git a/src/EventSubscriber/ProductEventSubscriber.php b/src/EventSubscriber/ProductEventSubscriber.php index 5c65d42..dc5b2b9 100644 --- a/src/EventSubscriber/ProductEventSubscriber.php +++ b/src/EventSubscriber/ProductEventSubscriber.php @@ -30,7 +30,7 @@ public static function getSubscribedEvents(): array return [ 'sylius.product.post_create' => 'onProductPostCreateOrUpdate', 'sylius.product.post_update' => 'onProductPostCreateOrUpdate', - 'sylius.product.post_delete' => 'onProductPostDelete', + 'sylius.product.pre_delete' => 'onProductPreDelete', ]; } @@ -74,7 +74,7 @@ public function onProductPostCreateOrUpdate(GenericEvent $event): void } } - public function onProductPostDelete(GenericEvent $event): void + public function onProductPreDelete(GenericEvent $event): void { $product = $event->getSubject(); if (!$product instanceof ProductInterface) { diff --git a/src/IndexManager/ElasticsearchIndexManager.php b/src/IndexManager/ElasticsearchIndexManager.php index 444669b..e9a4d2e 100644 --- a/src/IndexManager/ElasticsearchIndexManager.php +++ b/src/IndexManager/ElasticsearchIndexManager.php @@ -57,7 +57,7 @@ public function create(ChannelInterface $channel, DocumentTypeInterface $documen $this->client->removeIndexes($indexesToRemoveWildcard, [$indexName]); } - public function upsertDocument( + public function upsertDocuments( ChannelInterface $channel, DocumentTypeInterface $documentType, string|int ...$identifiers, @@ -77,4 +77,24 @@ public function upsertDocument( } yield Message::createMessage(sprintf('Updated %d documents in alias "%s".', $countBulkActions, $aliasName)); } + + public function removeDocuments( + ChannelInterface $channel, + DocumentTypeInterface $documentType, + int|string ...$identifiers, + ): Generator { + $aliasName = $this->indexNameGenerator->generateAlias($channel, $documentType); + + $bulkActions = []; + foreach ($identifiers as $identifier) { + $bulkActions[] = new BulkAction(Action::DELETE, $aliasName, null, null, $identifier); + } + + $countBulkActions = count($bulkActions); + yield Message::createMessage(sprintf('Removed 0/%d documents.', $countBulkActions)); + foreach ($this->client->bulk($aliasName, $bulkActions) as $documentsIndexed) { + yield Message::createMessage(sprintf('Removed %d/%d documents.', $documentsIndexed, $countBulkActions)); + } + yield Message::createMessage(sprintf('Removed %d documents in alias "%s".', $countBulkActions, $aliasName)); + } } diff --git a/src/IndexManager/IndexManagerInterface.php b/src/IndexManager/IndexManagerInterface.php index 3fa9b45..3100cd7 100644 --- a/src/IndexManager/IndexManagerInterface.php +++ b/src/IndexManager/IndexManagerInterface.php @@ -19,9 +19,18 @@ public function create(ChannelInterface $channel, DocumentTypeInterface $documen /** * @return Generator */ - public function upsertDocument( + public function upsertDocuments( ChannelInterface $channel, DocumentTypeInterface $documentType, string|int ...$identifiers, ): Generator; + + /** + * @return Generator + */ + public function removeDocuments( + ChannelInterface $channel, + DocumentTypeInterface $documentType, + int|string ...$identifiers, + ): Generator; } diff --git a/src/MessageHandler/RemoveDocumentIfExistsHandler.php b/src/MessageHandler/RemoveDocumentIfExistsHandler.php new file mode 100644 index 0000000..5066411 --- /dev/null +++ b/src/MessageHandler/RemoveDocumentIfExistsHandler.php @@ -0,0 +1,41 @@ +channelRepository->find($message->getChannelId()); + if (!$channel instanceof ChannelInterface) { + throw new InvalidArgumentException(sprintf( + 'Channel with id "%s" does not exist.', + $message->getChannelId(), + )); + } + + $documentType = $this->documentTypeProvider->getDocumentType($message->getDocumentTypeCode()); + + foreach ($this->indexManager->removeDocuments($channel, $documentType, $message->getDocumentIdentifier()) as $outputMessage) { + } + } +} diff --git a/src/MessageHandler/UpsertDocumentHandler.php b/src/MessageHandler/UpsertDocumentHandler.php index 1e99a63..5f4a720 100644 --- a/src/MessageHandler/UpsertDocumentHandler.php +++ b/src/MessageHandler/UpsertDocumentHandler.php @@ -35,7 +35,7 @@ public function __invoke(UpsertDocument $message): void $documentType = $this->documentTypeProvider->getDocumentType($message->getDocumentTypeCode()); - foreach ($this->indexManager->upsertDocument($channel, $documentType, $message->getDocumentIdentifier()) as $outputMessage) { + foreach ($this->indexManager->upsertDocuments($channel, $documentType, $message->getDocumentIdentifier()) as $outputMessage) { } } }