Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
4rthem committed Apr 22, 2024
1 parent 92aa758 commit 9121a93
Show file tree
Hide file tree
Showing 107 changed files with 981 additions and 999 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Alchemy\StorageBundle\Upload\UploadManager;
use Alchemy\StorageBundle\Util\FileUtil;
use App\Api\Model\Input\AssetSourceInput;
use App\Consumer\Handler\File\ImportFile;
use App\Consumer\Handler\File\ImportFileHandler;
use App\Entity\Core\File;
use App\Entity\Core\Workspace;
Expand Down Expand Up @@ -96,8 +97,7 @@ protected function handleSource(?AssetSourceInput $source, Workspace $workspace)
$this->em->persist($file);

if ($source->importFile) {
$this->postFlushStackListener
->addEvent(ImportFileHandler::createEvent($file->getId()));
$this->postFlushStackListener->addBusMessage(new ImportFile($file->getId()));
}

return $file;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use App\Api\Processor\WithOwnerIdProcessorTrait;
use App\Asset\AssetManager;
use App\Asset\OriginalRenditionManager;
use App\Consumer\Handler\File\CopyFileToAsset;
use App\Consumer\Handler\File\CopyFileToAssetHandler;
use App\Entity\Core\Asset;
use App\Entity\Core\AssetRelationship;
Expand Down Expand Up @@ -161,7 +162,7 @@ private function handleFile(AssetInput $data, Asset $asset): ?File
if (null !== $file = $this->handleSource($data->sourceFile, $asset->getWorkspace())) {
return $file;
} elseif (null !== $file = $this->handleFromFile($data->sourceFileId)) {
$this->postFlushStackListener->addEvent(CopyFileToAssetHandler::createEvent($asset->getId(), $file->getId()));
$this->postFlushStackListener->addBusMessage(new CopyFileToAsset($asset->getId(), $file->getId()));

return $file;
} elseif (null !== $file = $this->handleUpload($asset->getWorkspace())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace App\Api\InputTransformer;

use App\Api\Model\Input\RenditionInput;
use App\Consumer\Handler\File\CopyFileToRendition;
use App\Consumer\Handler\File\CopyFileToRenditionHandler;
use App\Entity\Core\Asset;
use App\Entity\Core\AssetRendition;
Expand Down Expand Up @@ -46,7 +47,7 @@ public function transform(object $data, string $resourceClass, array $context =
if (null !== $file = $this->handleSource($data->sourceFile, $workspace)) {
$object->setFile($file);
} elseif (null !== $file = $this->handleFromFile($data->sourceFileId)) {
$this->postFlushStackListener->addEvent(CopyFileToRenditionHandler::createEvent($object->getId(), $file->getId()));
$this->postFlushStackListener->addBusMessage(new CopyFileToRendition($object->getId(), $file->getId()));
$object->setFile($file);
} elseif (null !== $file = $this->handleUpload($workspace)) {
$object->setFile($file);
Expand Down
8 changes: 4 additions & 4 deletions databox/api/src/Api/Processor/CopyAssetProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,21 @@
use ApiPlatform\State\ProcessorInterface;
use App\Api\Model\Input\CopyAssetInput;
use App\Asset\AssetCopier;
use App\Consumer\Handler\Asset\AssetCopyHandler;
use App\Consumer\Handler\Asset\AssetCopy;
use App\Entity\Core\Asset;
use App\Security\Voter\AbstractVoter;
use App\Util\SecurityAwareTrait;
use Arthem\Bundle\RabbitBundle\Producer\EventProducer;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Messenger\MessageBusInterface;

class CopyAssetProcessor implements ProcessorInterface
{
use SecurityAwareTrait;
use WithOwnerIdProcessorTrait;

public function __construct(
private readonly EventProducer $eventProducer,
private readonly MessageBusInterface $bus,
private readonly EntityManagerInterface $em,
private readonly IriConverterInterface $iriConverter
) {
Expand Down Expand Up @@ -58,7 +58,7 @@ public function process($data, Operation $operation, array $uriVariables = [], a
$this->denyAccessUnlessGranted(AbstractVoter::READ, $asset);
$symlink = $data->byReference && $this->isGranted(AbstractVoter::EDIT, $asset);

$this->eventProducer->publish(AssetCopyHandler::createEvent(
$this->bus->dispatch(new AssetCopy(
$userId,
$userGroups,
$asset->getId(),
Expand Down
9 changes: 4 additions & 5 deletions databox/api/src/Api/Processor/IncomingUploadProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@
use ApiPlatform\Metadata\Operation;
use ApiPlatform\Metadata\Post;
use ApiPlatform\State\ProcessorInterface;
use App\Border\Consumer\Handler\Uploader\UploaderNewCommitHandler;
use App\Border\Consumer\Handler\Uploader\UploaderNewCommit;
use App\Border\Model\Upload\IncomingUpload;
use Arthem\Bundle\RabbitBundle\Consumer\Event\EventMessage;
use Arthem\Bundle\RabbitBundle\Producer\EventProducer;
use Symfony\Component\Messenger\MessageBusInterface;

readonly class IncomingUploadProcessor implements ProcessorInterface
{
public function __construct(private EventProducer $eventProducer)
public function __construct(private MessageBusInterface $bus)
{
}

Expand All @@ -24,7 +23,7 @@ public function __construct(private EventProducer $eventProducer)
public function process($data, Operation $operation, array $uriVariables = [], array $context = []): IncomingUpload
{
if ($operation instanceof Post) {
$this->eventProducer->publish(new EventMessage(UploaderNewCommitHandler::EVENT, $data->toArray()));
$this->bus->dispatch(new UploaderNewCommit($data->toArray()));
}

return $data;
Expand Down
7 changes: 4 additions & 3 deletions databox/api/src/Api/Processor/MoveAssetProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
use ApiPlatform\Metadata\Operation;
use ApiPlatform\State\ProcessorInterface;
use App\Api\Model\Input\MoveAssetInput;
use App\Consumer\Handler\Asset\AssetMove;
use App\Consumer\Handler\Asset\AssetMoveHandler;
use App\Entity\Core\Asset;
use App\Security\Voter\AbstractVoter;
use App\Util\SecurityAwareTrait;
use Arthem\Bundle\RabbitBundle\Producer\EventProducer;
use Symfony\Component\Messenger\MessageBusInterface;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\HttpFoundation\Response;

Expand All @@ -21,7 +22,7 @@ class MoveAssetProcessor implements ProcessorInterface
use SecurityAwareTrait;

public function __construct(
private readonly EventProducer $eventProducer,
private readonly MessageBusInterface $bus,
private readonly EntityManagerInterface $em,
private readonly IriConverterInterface $iriConverter
) {
Expand All @@ -40,7 +41,7 @@ public function process($data, Operation $operation, array $uriVariables = [], a

foreach ($assets as $asset) {
$this->denyAccessUnlessGranted(AbstractVoter::EDIT, $asset);
$this->eventProducer->publish(AssetMoveHandler::createEvent($asset->getId(), $data->destination));
$this->bus->dispatch(new AssetMove($asset->getId(), $data->destination));
}

return new Response('', 204);
Expand Down
4 changes: 2 additions & 2 deletions databox/api/src/Api/Processor/MoveCollectionProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use App\Entity\Core\Collection;
use App\Security\Voter\AbstractVoter;
use App\Util\SecurityAwareTrait;
use Arthem\Bundle\RabbitBundle\Producer\EventProducer;
use Symfony\Component\Messenger\MessageBusInterface;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\HttpKernel\Exception\NotFoundHttpException;
Expand All @@ -20,7 +20,7 @@ class MoveCollectionProcessor implements ProcessorInterface
use SecurityAwareTrait;

public function __construct(
private readonly EventProducer $eventProducer,
private readonly MessageBusInterface $bus,
private readonly EntityManagerInterface $em,
private readonly IriConverterInterface $iriConverter
) {
Expand Down
7 changes: 4 additions & 3 deletions databox/api/src/Asset/AssetCopier.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace App\Asset;

use App\Consumer\Handler\File\NewAssetFromBorder;
use App\Consumer\Handler\File\NewAssetFromBorderHandler;
use App\Entity\Core\Asset;
use App\Entity\Core\AssetRendition;
Expand All @@ -12,7 +13,7 @@
use App\Entity\Core\File;
use App\Entity\Core\Workspace;
use App\Security\RenditionPermissionManager;
use Arthem\Bundle\RabbitBundle\Producer\EventProducer;
use Symfony\Component\Messenger\MessageBusInterface;
use Doctrine\ORM\EntityManagerInterface;

class AssetCopier
Expand All @@ -22,7 +23,7 @@ class AssetCopier

private array $fileCopies = [];

public function __construct(private readonly EventProducer $eventProducer, private readonly EntityManagerInterface $em, private readonly RenditionPermissionManager $renditionPermissionManager, private readonly FileCopier $fileCopier)
public function __construct(private readonly MessageBusInterface $bus, private readonly EntityManagerInterface $em, private readonly RenditionPermissionManager $renditionPermissionManager, private readonly FileCopier $fileCopier)
{
}

Expand Down Expand Up @@ -52,7 +53,7 @@ public function copyAsset(
$file = $this->copyFile($asset->getSource(), $workspace);
$this->em->flush();

$this->eventProducer->publish(NewAssetFromBorderHandler::createEvent(
$this->bus->dispatch(new NewAssetFromBorder(
$userId,
$file->getId(),
$collection ? [$collection->getId()] : [],
Expand Down
3 changes: 2 additions & 1 deletion databox/api/src/Attribute/BatchAttributeManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use Alchemy\MessengerBundle\Listener\PostFlushStack;
use App\Api\Model\Input\Attribute\AssetAttributeBatchUpdateInput;
use App\Api\Model\Input\Attribute\AttributeActionInput;
use App\Consumer\Handler\Asset\AttributeChanged;
use App\Consumer\Handler\Asset\AttributeChangedEventHandler;
use App\Entity\Core\Asset;
use App\Entity\Core\Attribute;
Expand Down Expand Up @@ -269,7 +270,7 @@ public function handleBatch(
$this->deferredIndexListener->scheduleForUpdate($this->em->getReference(Asset::class, $assetId));

if ($dispatchUpdateEvent) {
$this->postFlushStack->addEvent(AttributeChangedEventHandler::createEvent(
$this->postFlushStack->addBusMessage(new AttributeChanged(
$attributes,
$assetId,
$user?->getId(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

namespace App\Border\Consumer\Handler\Uploader;

final readonly class UploaderNewCommit
{
public function __construct(
private array $payload
) {
}

public function getPayload(): array
{
return $this->payload;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,17 @@
use Alchemy\Workflow\WorkflowOrchestrator;
use App\Border\Model\Upload\IncomingUpload;
use App\Workflow\Event\IncomingUploaderFileWorkflowEvent;
use Arthem\Bundle\RabbitBundle\Consumer\Event\AbstractEntityManagerHandler;
use Arthem\Bundle\RabbitBundle\Consumer\Event\EventMessage;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

class UploaderNewCommitHandler extends AbstractEntityManagerHandler
#[AsMessageHandler]
final readonly class UploaderNewCommitHandler
{
final public const EVENT = 'uploader_new_commit';

public function __construct(private readonly WorkflowOrchestrator $workflowOrchestrator)
{
public function __construct(
private WorkflowOrchestrator $workflowOrchestrator
) {
}

public function handle(EventMessage $message): void
public function __invoke(UploaderNewCommit $message): void
{
$upload = IncomingUpload::fromArray($message->getPayload());

Expand All @@ -30,9 +29,4 @@ public function handle(EventMessage $message): void
));
}
}

public static function getHandledEvents(): array
{
return [self::EVENT];
}
}
19 changes: 13 additions & 6 deletions databox/api/src/Consumer/Handler/AbstractBatchHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,21 @@

namespace App\Consumer\Handler;

use Arthem\Bundle\RabbitBundle\Consumer\Event\AbstractEntityManagerHandler;
use Arthem\Bundle\RabbitBundle\Consumer\Event\EventMessage;
use Alchemy\ESBundle\Indexer\SearchIndexer;
use Doctrine\ORM\EntityManagerInterface;

abstract class AbstractBatchHandler extends AbstractEntityManagerHandler
abstract readonly class AbstractBatchHandler
{
public function handle(EventMessage $message): void
public function __construct(
protected SearchIndexer $searchIndexer,
protected EntityManagerInterface $em,
)
{
$iterator = $this->getIterator($message);
}

protected function doHandle(): void
{
$iterator = $this->getIterator();
$batchSize = $this->getBatchSize();

$stack = [];
Expand All @@ -30,7 +37,7 @@ public function handle(EventMessage $message): void
}
}

abstract protected function getIterator(EventMessage $message): iterable;
abstract protected function getIterator(): iterable;

abstract protected function flushIndexStack(array $stack): void;

Expand Down
47 changes: 47 additions & 0 deletions databox/api/src/Consumer/Handler/Asset/AssetCopy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?php

namespace App\Consumer\Handler\Asset;

final readonly class AssetCopy
{
public function __construct(
private string $userId,
private array $groupsId,
private string $id,
private string $destination,
private ?bool $link = null,
private array $options = []
) {

}

public function getUserId(): string
{
return $this->userId;
}

public function getGroupsId(): array
{
return $this->groupsId;
}

public function getId(): string
{
return $this->id;
}

public function getDestination(): string
{
return $this->destination;
}

public function getLink(): ?bool
{
return $this->link;
}

public function getOptions(): array
{
return $this->options;
}
}
Loading

0 comments on commit 9121a93

Please sign in to comment.