diff --git a/src/Console/Commands/EventStoreWorker.php b/src/Console/Commands/EventStoreWorker.php index 758262b..73e554a 100644 --- a/src/Console/Commands/EventStoreWorker.php +++ b/src/Console/Commands/EventStoreWorker.php @@ -7,6 +7,7 @@ use Carbon\Carbon; use EventLoop\EventLoop; +use Illuminate\Support\Facades\Log; use ReflectionClass; use ReflectionProperty; use Rxnet\EventStore\EventStore; @@ -79,20 +80,12 @@ private function processPersistentStream($eventStore, string $stream): void $eventStore ->persistentSubscription($stream, config('eventstore.group'), $this->option('parallel') ?? 1) ->subscribe(function (AcknowledgeableEventRecord $event) { - $this->info('[' . Carbon::now()->toDateTimeString() . '] [persistent] ' . config('eventstore.http_url') . '/streams/' . $event->getStreamId() . '/' . $event->getNumber()); + $this->logEventStart($event, 'persistent'); try { $this->dispatch($event); $event->ack(); } catch (\Exception $e) { - dump([ - 'id' => $event->getId(), - 'number' => $event->getNumber(), - 'stream' => $event->getStreamId(), - 'type' => $event->getType(), - 'created' => $event->getCreated(), - 'data' => $event->getData(), - 'metadata' => $event->getMetadata(), - ]); + $this->dumpEvent($event); $event->nack(); report($e); } @@ -104,24 +97,50 @@ private function processVolatileStream($eventStore, string $stream): void $eventStore ->volatileSubscription($stream) ->subscribe(function (EventRecord $event) { - $this->info('[' . Carbon::now()->toDateTimeString() . '] [volatile] ' . config('eventstore.http_url') . '/streams/' . $event->getStreamId() . '/' . $event->getNumber()); + $this->logEventStart($event, 'volatile'); try { $this->dispatch($event); } catch (\Exception $e) { - dump([ - 'id' => $event->getId(), - 'number' => $event->getNumber(), - 'stream' => $event->getStreamId(), - 'type' => $event->getType(), - 'created' => $event->getCreated(), - 'data' => $event->getData(), - 'metadata' => $event->getMetadata(), - ]); + $this->dumpEvent($event); report($e); } }, 'report'); } + protected function logEventStart(EventRecord $event, $type = '') + { + $url = parse_url(config('eventstore.http_url')); + $url = "{$url['scheme']}://{$url['host']}:{$url['port']}"; + + $stream = $event->getStreamId(); + $number = $event->getNumber(); + + Log::info("{$url}/streams/{$stream}/{$number}", ['type' => $type]); + } + + protected function dumpEvent(EventRecord $event) + { + dump([ + 'id' => $event->getId(), + 'number' => $event->getNumber(), + 'stream' => $event->getStreamId(), + 'type' => $event->getType(), + 'created' => $event->getCreated(), + 'data' => $event->getData(), + 'metadata' => $this->safeGetMetadata($event), + ]); + } + + protected function safeGetMetadata(EventRecord $event) + { + try { + return $event->getMetadata(); + } + catch (TypeError $e) { + return []; + } + } + public function dispatch(EventRecord $eventRecord): void { $event = $this->makeSerializableEvent($eventRecord); @@ -141,13 +160,7 @@ private function makeSerializableEvent(EventRecord $event): JsonEventRecord $data->setEventType($event->getType()); $data->setCreatedEpoch($event->getCreated()->getTimestamp() * 1000); $data->setData(json_encode($event->getData())); - - try { - $data->setMetadata(json_encode($event->getMetadata())); - } - catch (TypeError $e) { - - } + $data->setMetadata(json_encode($this->safeGetMetadata($event))); return new JsonEventRecord($data); }