Skip to content

Commit

Permalink
fix: calls to getMetadata which might return null
Browse files Browse the repository at this point in the history
  • Loading branch information
morrislaptop committed Sep 16, 2019
1 parent a50cfeb commit 7f1ae59
Showing 1 changed file with 40 additions and 27 deletions.
67 changes: 40 additions & 27 deletions src/Console/Commands/EventStoreWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

use Carbon\Carbon;
use EventLoop\EventLoop;
use Illuminate\Support\Facades\Log;
use ReflectionClass;
use ReflectionProperty;
use Rxnet\EventStore\EventStore;
Expand Down Expand Up @@ -79,20 +80,12 @@ private function processPersistentStream($eventStore, string $stream): void
$eventStore
->persistentSubscription($stream, config('eventstore.group'), $this->option('parallel') ?? 1)
->subscribe(function (AcknowledgeableEventRecord $event) {
$this->info('[' . Carbon::now()->toDateTimeString() . '] [persistent] ' . config('eventstore.http_url') . '/streams/' . $event->getStreamId() . '/' . $event->getNumber());
$this->logEventStart($event, 'persistent');
try {
$this->dispatch($event);
$event->ack();
} catch (\Exception $e) {
dump([
'id' => $event->getId(),
'number' => $event->getNumber(),
'stream' => $event->getStreamId(),
'type' => $event->getType(),
'created' => $event->getCreated(),
'data' => $event->getData(),
'metadata' => $event->getMetadata(),
]);
$this->dumpEvent($event);
$event->nack();
report($e);
}
Expand All @@ -104,24 +97,50 @@ private function processVolatileStream($eventStore, string $stream): void
$eventStore
->volatileSubscription($stream)
->subscribe(function (EventRecord $event) {
$this->info('[' . Carbon::now()->toDateTimeString() . '] [volatile] ' . config('eventstore.http_url') . '/streams/' . $event->getStreamId() . '/' . $event->getNumber());
$this->logEventStart($event, 'volatile');
try {
$this->dispatch($event);
} catch (\Exception $e) {
dump([
'id' => $event->getId(),
'number' => $event->getNumber(),
'stream' => $event->getStreamId(),
'type' => $event->getType(),
'created' => $event->getCreated(),
'data' => $event->getData(),
'metadata' => $event->getMetadata(),
]);
$this->dumpEvent($event);
report($e);
}
}, 'report');
}

protected function logEventStart(EventRecord $event, $type = '')
{
$url = parse_url(config('eventstore.http_url'));
$url = "{$url['scheme']}://{$url['host']}:{$url['port']}";

$stream = $event->getStreamId();
$number = $event->getNumber();

Log::info("{$url}/streams/{$stream}/{$number}", ['type' => $type]);
}

protected function dumpEvent(EventRecord $event)
{
dump([
'id' => $event->getId(),
'number' => $event->getNumber(),
'stream' => $event->getStreamId(),
'type' => $event->getType(),
'created' => $event->getCreated(),
'data' => $event->getData(),
'metadata' => $this->safeGetMetadata($event),
]);
}

protected function safeGetMetadata(EventRecord $event)
{
try {
return $event->getMetadata();
}
catch (TypeError $e) {
return [];
}
}

public function dispatch(EventRecord $eventRecord): void
{
$event = $this->makeSerializableEvent($eventRecord);
Expand All @@ -141,13 +160,7 @@ private function makeSerializableEvent(EventRecord $event): JsonEventRecord
$data->setEventType($event->getType());
$data->setCreatedEpoch($event->getCreated()->getTimestamp() * 1000);
$data->setData(json_encode($event->getData()));

try {
$data->setMetadata(json_encode($event->getMetadata()));
}
catch (TypeError $e) {

}
$data->setMetadata(json_encode($this->safeGetMetadata($event)));

return new JsonEventRecord($data);
}
Expand Down

0 comments on commit 7f1ae59

Please sign in to comment.