Skip to content

Commit

Permalink
add new show command
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Aug 16, 2023
1 parent 22a002a commit 957b166
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 45 deletions.
90 changes: 90 additions & 0 deletions src/Console/Command/ShowAggregateCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Console\Command;

use Patchlevel\EventSourcing\Console\InputHelper;
use Patchlevel\EventSourcing\Console\OutputStyle;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry;
use Patchlevel\EventSourcing\Serializer\EventSerializer;
use Patchlevel\EventSourcing\Store\Criteria;
use Patchlevel\EventSourcing\Store\Store;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Question\ChoiceQuestion;
use Symfony\Component\Console\Question\Question;

use function array_values;
use function sprintf;

#[AsCommand(
'event-sourcing:show-aggregate',
'show events from one aggregate',
)]
final class ShowAggregateCommand extends Command
{
public function __construct(
private readonly Store $store,
private readonly EventSerializer $serializer,
private readonly AggregateRootRegistry $aggregateRootRegistry,
) {
parent::__construct();
}

protected function configure(): void
{
$this
->addArgument('aggregate', InputArgument::OPTIONAL, 'aggregate name')
->addArgument('id', InputArgument::OPTIONAL, 'aggregate id');
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$console = new OutputStyle($input, $output);

$aggregate = InputHelper::nullableString($input->getArgument('aggregate'));
if ($aggregate === null) {
$question = new ChoiceQuestion(
'Choose the aggregate',
array_values($this->aggregateRootRegistry->aggregateNames()),
null,
);

$aggregate = InputHelper::string($console->askQuestion($question));
}

$id = InputHelper::nullableString($input->getArgument('id'));
if ($id === null) {
$question = new Question('Enter the aggregate id');
$id = InputHelper::string($console->askQuestion($question));
}

if (!$this->aggregateRootRegistry->hasAggregateName($aggregate)) {
$console->error(sprintf('aggregate type "%s" not exists', $aggregate));

return 1;
}

$stream = $this->store->load(
new Criteria($this->aggregateRootRegistry->aggregateClass($aggregate), $id),
);

$hasMessage = false;
foreach ($stream as $message) {
$hasMessage = true;
$console->message($this->serializer, $message);
}

if ($hasMessage) {
return 0;
}

$console->error(sprintf('aggregate "%s" => "%s" not found', $aggregate, $id));

return 1;
}
}
82 changes: 39 additions & 43 deletions src/Console/Command/ShowCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,85 +6,81 @@

use Patchlevel\EventSourcing\Console\InputHelper;
use Patchlevel\EventSourcing\Console\OutputStyle;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry;
use Patchlevel\EventSourcing\Serializer\EventSerializer;
use Patchlevel\EventSourcing\Store\Criteria;
use Patchlevel\EventSourcing\Store\Store;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Question\ChoiceQuestion;
use Symfony\Component\Console\Question\Question;

use function array_values;
use function sprintf;

#[AsCommand(
'event-sourcing:show',
'show events from one aggregate',
'show events from the event store',
)]
final class ShowCommand extends Command
{
public function __construct(
private readonly Store $store,
private readonly EventSerializer $serializer,
private readonly AggregateRootRegistry $aggregateRootRegistry,
) {
parent::__construct();
}

protected function configure(): void
{
$this
->addArgument('aggregate', InputArgument::OPTIONAL, 'aggregate name')
->addArgument('id', InputArgument::OPTIONAL, 'aggregate id');
->addOption(
'limit',
null,
InputOption::VALUE_REQUIRED,
'How many messages should be displayed',
10,
)
->addOption(
'forward',
null,
InputOption::VALUE_NONE,
'Show messages from the beginning of the stream',
);
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$console = new OutputStyle($input, $output);
$limit = InputHelper::positiveIntOrZero($input->getOption('limit'));
$forward = InputHelper::bool($input->getOption('forward'));

$aggregate = InputHelper::nullableString($input->getArgument('aggregate'));
if ($aggregate === null) {
$question = new ChoiceQuestion(
'Choose the aggregate',
array_values($this->aggregateRootRegistry->aggregateNames()),
null,
);

$aggregate = InputHelper::string($console->askQuestion($question));
}
$console = new OutputStyle($input, $output);

$id = InputHelper::nullableString($input->getArgument('id'));
if ($id === null) {
$question = new Question('Enter the aggregate id');
$id = InputHelper::string($console->askQuestion($question));
}
$maxCount = $this->store->count();
$stream = $this->store->load(null, !$forward);

if (!$this->aggregateRootRegistry->hasAggregateName($aggregate)) {
$console->error(sprintf('aggregate type "%s" not exists', $aggregate));
$currentCount = 0;

return 1;
}
do {
$i = 0;

$stream = $this->store->load(
new Criteria($this->aggregateRootRegistry->aggregateClass($aggregate), $id),
);
foreach ($stream as $message) {
$i++;
$currentCount++;

$hasMessage = false;
foreach ($stream as $message) {
$hasMessage = true;
$console->message($this->serializer, $message);
}
$console->message($this->serializer, $message);

if ($hasMessage) {
return 0;
}
if ($i >= $limit) {
break;
}
}

$console->error(sprintf('aggregate "%s" => "%s" not found', $aggregate, $id));
$continue = $console->confirm(sprintf(
'Show next %d messages? (%d/%d)',
$limit,
$currentCount,
$maxCount,
));
} while ($continue);

return 1;
return 0;
}
}
4 changes: 2 additions & 2 deletions src/Console/OutputStyle.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Patchlevel\EventSourcing\Console;

use DateTimeImmutable;
use DateTimeInterface;
use Patchlevel\EventSourcing\EventBus\Message;
use Patchlevel\EventSourcing\Serializer\Encoder\Encoder;
use Patchlevel\EventSourcing\Serializer\EventSerializer;
Expand All @@ -31,7 +31,7 @@ public function message(EventSerializer $serializer, Message $message): void
$message->aggregateClass(),
$message->aggregateId(),
$message->playhead(),
$message->recordedOn()->format(DateTimeImmutable::ATOM),
$message->recordedOn()->format(DateTimeInterface::ATOM),
],
]);

Expand Down

0 comments on commit 957b166

Please sign in to comment.