diff --git a/src/Console/Commands/EventStoreWorker.php b/src/Console/Commands/EventStoreWorker.php index d37d1d0..ac40e2a 100644 --- a/src/Console/Commands/EventStoreWorker.php +++ b/src/Console/Commands/EventStoreWorker.php @@ -2,187 +2,69 @@ namespace DigitalRisks\LaravelEventStore\Console\Commands; -use DigitalRisks\LaravelEventStore\Contracts\CouldBeReceived; -use DigitalRisks\LaravelEventStore\EventStore as LaravelEventStore; -use EventLoop\EventLoop; use Illuminate\Console\Command; -use Illuminate\Support\Facades\Log; -use ReflectionClass; -use ReflectionProperty; -use Rxnet\EventStore\Data\EventRecord as EventData; -use Rxnet\EventStore\EventStore; -use Rxnet\EventStore\Record\AcknowledgeableEventRecord; -use Rxnet\EventStore\Record\EventRecord; -use Rxnet\EventStore\Record\JsonEventRecord; -use TypeError; +use Symfony\Component\Process\Process; class EventStoreWorker extends Command { - protected $signature = 'eventstore:worker {--parallel= : How many events to run in parallel.} {--timeout= : How long the event should time out for.}'; + protected $signature = 'eventstore:worker'; - protected $description = 'Worker handling incoming events from ES'; + protected $description = 'Worker handling running stream processes'; - private $loop; - - public function __construct() - { - parent::__construct(); - - $this->loop = EventLoop::getLoop(); - } - - public function handle(): void - { - $timeout = $this->option('timeout') ?? 10; - - $this->loop->stop(); - - try { - $this->processAllStreams(); - $this->loop->run(); - } catch (\Exception $e) { - report($e); - } - - $this->error('Lost connection with EventStore - reconnecting in ' . $timeout); - - sleep($timeout); - - $this->handle(); - } - - private function processAllStreams(): void - { - $this->connectToStream(config('eventstore.subscription_streams'), function (EventStore $eventStore, string $stream) { - $this->processPersistentStream($eventStore, $stream); - }); - - $this->connectToStream(config('eventstore.volatile_streams'), function (EventStore $eventStore, string $stream) { - $this->processVolatileStream($eventStore, $stream); - }); - } - - - private function connectToStream($streams, $callback): void + protected function spawnProcess($stream, $type = 'persistent') { - foreach ($streams as $stream) { - $eventStore = new EventStore(); - $connection = $eventStore->connect(config('eventstore.tcp_url')); - $connection->subscribe(function () use ($eventStore, $stream, $callback) { - $callback($eventStore, $stream); - }, 'report'); + if (empty($stream)) { + return null; } - } - private function processPersistentStream($eventStore, string $stream): void - { - $eventStore - ->persistentSubscription($stream, config('eventstore.group'), $this->option('parallel') ?? 1) - ->subscribe(function (AcknowledgeableEventRecord $event) { - try { - $this->dispatch($event); - $event->ack(); - } catch (\Exception $e) { - $this->dumpEvent($event); - $event->nack(); - report($e); - } - }, 'report'); - } + $command = "php artisan eventstore:worker-thread --stream={$stream} --type={$type}"; + $process = Process::fromShellCommandline($command); + $process->start(); - private function processVolatileStream($eventStore, string $stream): void - { - $eventStore - ->volatileSubscription($stream) - ->subscribe(function (EventRecord $event) { - try { - $this->dispatch($event); - } catch (\Exception $e) { - $this->dumpEvent($event); - report($e); - } - }, 'report'); + return $process; } - protected function dumpEvent(EventRecord $event) - { - dump([ - 'id' => $event->getId(), - 'number' => $event->getNumber(), - 'stream' => $event->getStreamId(), - 'type' => $event->getType(), - 'created' => $event->getCreated(), - 'data' => $event->getData(), - 'metadata' => $this->safeGetMetadata($event), - ]); - } - - protected function safeGetMetadata(EventRecord $event) - { - try { - return $event->getMetadata() ?? []; - } catch (TypeError $e) { - return []; - } - } - - public function dispatch(EventRecord $eventRecord): void - { - $logger = LaravelEventStore::$logger; - $serializedEvent = $payload = $this->makeSerializableEvent($eventRecord); - $event = $serializedEvent->getType(); - - if ($localEvent = $this->mapToLocalEvent($serializedEvent)) { - $event = $localEvent; - $payload = null; - } - - $logger($serializedEvent, $event); - event($event, $payload); - } - - private function makeSerializableEvent(EventRecord $event): JsonEventRecord - { - $data = new EventData(); - - $data->setEventId($event->getId()); - $data->setEventType($event->getType()); - $data->setEventNumber($event->getNumber()); - $data->setData(json_encode($event->getData())); - $data->setEventStreamId($event->getStreamId()); - $data->setMetadata(json_encode($this->safeGetMetadata($event))); - $data->setCreatedEpoch($event->getCreated()->getTimestamp() * 1000); - - - return new JsonEventRecord($data); - } - - protected function mapToLocalEvent($event) + public function handle(): void { - $eventToClass = LaravelEventStore::$eventToClass; - $className = $eventToClass ? $eventToClass($event) : 'App\Events\\' . $event->getType(); - - if (!class_exists($className)) { - return; + $entries = []; + + foreach (config('eventstore.subscription_streams') as $stream) { + if (($process = $this->spawnProcess($stream, 'persistent')) !== null) { + $entries[] = [ + 'process' => $process, + 'stream' => $stream, + 'type' => 'persistent' + ]; + } } - $reflection = new ReflectionClass($className); - - if (!$reflection->implementsInterface(CouldBeReceived::class)) { - return; + foreach (config('eventstore.volatile_streams') as $stream) { + if (($process = $this->spawnProcess($stream, 'volatile')) !== null) { + $entries[] = [ + 'process' => $process, + 'stream' => $stream, + 'type' => 'volatile' + ]; + } } - $localEvent = new $className(); - $props = $reflection->getProperties(ReflectionProperty::IS_PUBLIC); - $data = $event->getData(); + while (true) { + foreach ($entries as $key => $entry) { + if (!$entry['process']->isRunning()) { + $this->info("Process {$key} {$entry['process']->getCommandLine()} stopped running - restarting"); + unset($entries[$key]); + + if (($process = $this->spawnProcess($entry['stream'], $entry['type'])) !== null) { + $entries[] = [ + 'process' => $process, + 'stream' => $entry['stream'], + 'type' => $entry['type'] + ]; + } + } + } - foreach ($props as $prop) { - $key = $prop->getName(); - $localEvent->$key = $data[$key] ?? null; + sleep(1); } - - $localEvent->setEventRecord($event); - - return $localEvent; } } diff --git a/src/Console/Commands/EventStoreWorkerThread.php b/src/Console/Commands/EventStoreWorkerThread.php new file mode 100644 index 0000000..306579c --- /dev/null +++ b/src/Console/Commands/EventStoreWorkerThread.php @@ -0,0 +1,174 @@ +loop = EventLoop::getLoop(); + } + + public function handle(): void + { + if (!$this->option('stream')) { + $this->info("Stream option is required"); + return; + } + + $this->loop->stop(); + try { + $this->processStream(); + $this->loop->run(); + } catch (\Exception $e) { + report($e); + } + + $this->error('Lost connection with EventStore - reconnecting'); + sleep(1); + + $this->handle(); + } + + private function connect($callback): void + { + $eventStore = new EventStore(); + $eventStore->connect(config('eventstore.tcp_url')) + ->subscribe(function () use ($callback, $eventStore) { + $callback($eventStore); + }, 'report'); + } + + private function processStream(): void + { + $this->connect(function ($eventStore) { + if ($this->option('type') == 'volatile') { + $this->processVolatileStream($eventStore, $this->option('stream')); + } + + if ($this->option('type') == 'persistent') { + $this->processPersistentStream($eventStore, $this->option('stream')); + } + }); + } + + private function processPersistentStream(EventStore $eventStore, string $stream): void + { + $eventStore->persistentSubscription($stream, config('eventstore.group')) + ->subscribe(function (AcknowledgeableEventRecord $event) { + try { + $this->dispatch($event); + + return $event->ack(); + } catch (\Exception $e) { + report($e); + + return $event->nack($event::NACK_ACTION_PARK); + } + }); + } + + private function processVolatileStream(EventStore $eventStore, string $stream): void + { + $eventStore->volatileSubscription($stream) + ->subscribe(function (EventRecord $event) { + try { + $this->dispatch($event); + } catch (\Exception $e) { + $this->dumpEvent($event); + report($e); + } + }, 'report'); + } + + protected function safeGetMetadata(EventRecord $event) + { + try { + return $event->getMetadata() ?? []; + } catch (TypeError $e) { + return []; + } + } + + public function dispatch(EventRecord $eventRecord): void + { + $logger = LaravelEventStore::$logger; + $serializedEvent = $payload = $this->makeSerializableEvent($eventRecord); + $event = $serializedEvent->getType(); + + if ($localEvent = $this->mapToLocalEvent($serializedEvent)) { + $event = $localEvent; + $payload = null; + } + + $logger($serializedEvent, $event); + event($event, $payload); + } + + private function makeSerializableEvent(EventRecord $event): JsonEventRecord + { + $data = new EventData(); + + $data->setEventId($event->getId()); + $data->setEventType($event->getType()); + $data->setEventNumber($event->getNumber()); + $data->setData(json_encode($event->getData())); + $data->setEventStreamId($event->getStreamId()); + $data->setMetadata(json_encode($this->safeGetMetadata($event))); + $data->setCreatedEpoch($event->getCreated()->getTimestamp() * 1000); + + return new JsonEventRecord($data); + } + + protected function mapToLocalEvent($event) + { + $eventToClass = LaravelEventStore::$eventToClass; + $className = $eventToClass ? $eventToClass($event) : 'App\Events\\' . $event->getType(); + + if (!class_exists($className)) { + return; + } + + $reflection = new ReflectionClass($className); + + if (!$reflection->implementsInterface(CouldBeReceived::class)) { + return; + } + + $localEvent = new $className(); + $props = $reflection->getProperties(ReflectionProperty::IS_PUBLIC); + $data = $event->getData(); + + foreach ($props as $prop) { + $key = $prop->getName(); + $localEvent->$key = $data[$key] ?? null; + } + + $localEvent->setEventRecord($event); + + return $localEvent; + } +} diff --git a/src/ServiceProvider.php b/src/ServiceProvider.php index c793139..eea4c08 100755 --- a/src/ServiceProvider.php +++ b/src/ServiceProvider.php @@ -4,6 +4,7 @@ use DigitalRisks\LaravelEventStore\Console\Commands\EventStoreReset; use DigitalRisks\LaravelEventStore\Console\Commands\EventStoreWorker; +use DigitalRisks\LaravelEventStore\Console\Commands\EventStoreWorkerThread; use DigitalRisks\LaravelEventStore\Contracts\ShouldBeStored; use DigitalRisks\LaravelEventStore\EventStore; use DigitalRisks\LaravelEventStore\Listeners\SendToEventStoreListener; @@ -24,6 +25,7 @@ public function boot() $this->commands([ EventStoreWorker::class, + EventStoreWorkerThread::class, EventStoreReset::class, ]); } diff --git a/tests/LoggerTest.php b/tests/LoggerTest.php index 5764f0c..41445d4 100644 --- a/tests/LoggerTest.php +++ b/tests/LoggerTest.php @@ -2,9 +2,7 @@ namespace DigitalRisks\LaravelEventStore\Tests; -use DigitalRisks\LaravelEventStore\Console\Commands\EventStoreWorker; -use DigitalRisks\LaravelEventStore\EventStore; -use DigitalRisks\LaravelEventStore\Tests\Fixtures\TestEvent; +use DigitalRisks\LaravelEventStore\Console\Commands\EventStoreWorkerThread; use DigitalRisks\LaravelEventStore\Tests\Traits\MakesEventRecords; use Illuminate\Support\Facades\Event; use Illuminate\Support\Facades\Log; @@ -19,7 +17,7 @@ public function test_it_logs_an_event() // Arrange. Log::swap(new LogFake); - $worker = resolve(EventStoreWorker::class); + $worker = resolve(EventStoreWorkerThread::class); $event = $this->makeEventRecord('test_event', ['hello' => 'world']); // Act. @@ -39,7 +37,7 @@ public function test_it_logs_an_event_with_a_listener() // Arrange. Log::swap(new LogFake); - $worker = resolve(EventStoreWorker::class); + $worker = resolve(EventStoreWorkerThread::class); $event = $this->makeEventRecord('test_event', ['hello' => 'world']); Event::listen('test_event', function () { }); diff --git a/tests/WorkerTest.php b/tests/WorkerTest.php index 75f28b0..7590d3b 100644 --- a/tests/WorkerTest.php +++ b/tests/WorkerTest.php @@ -2,7 +2,7 @@ namespace DigitalRisks\LaravelEventStore\Tests; -use DigitalRisks\LaravelEventStore\Console\Commands\EventStoreWorker; +use DigitalRisks\LaravelEventStore\Console\Commands\EventStoreWorkerThread; use DigitalRisks\LaravelEventStore\EventStore; use DigitalRisks\LaravelEventStore\Tests\Fixtures\TestEvent; use DigitalRisks\LaravelEventStore\Tests\Traits\InteractsWithEventStore; @@ -18,7 +18,7 @@ public function test_it_dispatches_an_event_from_a_subscribed_event() { // Arrange. Event::fake(); - $worker = resolve(EventStoreWorker::class); + $worker = resolve(EventStoreWorkerThread::class); $event = $this->makeEventRecord('event_with_no_class', ['hello' => 'world']); // Act. @@ -36,7 +36,7 @@ public function test_it_dispatches_a_classed_event_from_a_subscribed_event() { // Arrange. Event::fake(); - $worker = resolve(EventStoreWorker::class); + $worker = resolve(EventStoreWorkerThread::class); $event = $this->makeEventRecord('test_event', ['hello' => 'world']); EventStore::eventToClass(function ($event) { @@ -58,7 +58,7 @@ public function test_it_handles_an_event_with_no_metadata() { // Arrange. Event::fake(); - $worker = resolve(EventStoreWorker::class); + $worker = resolve(EventStoreWorkerThread::class); $event = $this->makeEventRecord('test_event', ['hello' => 'world'], null); // Act.