Skip to content

Commit

Permalink
DR-897 volatile subscription events (#7)
Browse files Browse the repository at this point in the history
feat: ability to run volatile or persistent streams

BREAKING CHANGE: Split `streams` config var into `SUBSCRIPTION_STREAMS` and `VOLATILE_STREAMS`
  • Loading branch information
Craig Morris authored Sep 9, 2019
1 parent 505285d commit a00aad9
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 31 deletions.
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ EVENTSTORE_STREAMS_ACCOUNTS=accounts-v13

EVENTSTORE_SUBSCRIPTION=reporting-service
EVENTSTORE_SUBSCRIPTION_STREAMS=accounts
EVENTSTORE_VOLATILE_STREAMS=quotes-v2,accounts-v13
37 changes: 33 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,36 @@ class AccountCreatedTest extends TestCase

You must first run the worker which will listen for events.

`php artisan eventstore:worker`
*None of the options are required. By default it will run the persistance subscription with a timeout of 10 seconds and 1 parallel event at a time.*

``` sh
$ php artisan eventstore:worker
{--persist : Run persistent mode.}
{--volatile : Run volatile mode.}
{--parallel= : How many events to run in parallel.}
{--timeout= : How long the event should time out for.}

$ php artisan eventstore:worker --persist

$ php artisan eventstore:worker --persist --timeout=10

$ php artisan eventstore:worker --persist --parallel=10

$ php artisan eventstore:worker --persist --parallel=10 --timeout=5

$ php artisan eventstore:worker --volatile

$ php artisan eventstore:worker --volatile --timeout=10

$ php artisan eventstore:worker --persist --volatile

$ php artisan eventstore:worker --persist --volatile --timeout=10

$ php artisan eventstore:worker --persist --volatile --parallel=10

$ php artisan eventstore:worker --persist --volatile --parallel=10 --timeout=5

```

When an event is received, it will be mapped to the Laravel event and the original `EventRecord` can be accessed via `getEventRecord()`.

Expand Down Expand Up @@ -216,11 +245,11 @@ The defaults are set in `config/eventstore.php`. Copy this file to your own conf
return [
'tcp_url' => 'tls://admin:changeit@localhost:1113',
'http_url' => 'http://admin:changeit@localhost:2113',
'streams' => ['accounts'],
'group' => 'account-email-subscription',
'namespace' => 'App\Events',
'volatile_streams' => ['quotes', 'accounts'],
'subscription_streams' => ['quotes', 'accounts'],
'event_to_class' => function ($event) {
return $event->getType();
return 'App\Events\\' . $event->getType();
}
];
```
Expand Down
3 changes: 2 additions & 1 deletion config/eventstore.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
return [
'tcp_url' => env('EVENTSTORE_TCP_URL', 'tls://admin:changeit@localhost:1113'),
'http_url' => env('EVENTSTORE_HTTP_URL', 'http://admin:changeit@localhost:2113'),
'streams' => [],
'subscription_streams' => [],
'volatile_streams' => [],
'group' => '',
'event_to_class' => function ($event) {
return 'App\Events\\' . $event->getType();
Expand Down
81 changes: 59 additions & 22 deletions src/Console/Commands/EventStoreWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use DigitalRisks\LaravelEventStore\Contracts\CouldBeReceived;
use Illuminate\Console\Command;

use Carbon\Carbon;
use EventLoop\EventLoop;
use ReflectionClass;
use ReflectionProperty;
Expand All @@ -16,15 +17,17 @@

class EventStoreWorker extends Command
{
private $loop;

private $timeout = 10;

protected $signature = 'eventstore:worker --replay';
protected $signature = 'eventstore:worker
{--persist : Run persistent mode.}
{--volatile : Run volatile mode.}
{--parallel= : How many events to run in parallel.}
{--timeout= : How long the event should time out for.}';

protected $description = 'Worker handling incoming events from ES';

protected $eventstore;
private $loop;

private $timeout = 10;

public function __construct()
{
Expand All @@ -33,8 +36,10 @@ public function __construct()
$this->loop = EventLoop::getLoop();
}

public function handle()
public function handle(): void
{
$timeout = $this->option('timeout') ?? 10;

$this->loop->stop();

try {
Expand All @@ -44,35 +49,46 @@ public function handle()
report($e);
}

$this->error("Lost connection with EventStore - reconnecting in $this->timeout");
$this->error('Lost connection with EventStore - reconnecting in ' . $timeout);

sleep($this->timeout);
sleep($timeout);

$this->handle();
}

public function processAllStreams()
private function processAllStreams(): void
{
$streams = config('eventstore.streams');
if ($this->option('persist') || (!$this->option('persist') && !$this->option('volatile'))) {
$this->connectToStream(config('eventstore.subscription_streams'), function (EventStore $eventStore, string $stream) {
$this->processPersistentStream($eventStore, $stream);
});
}

if ($this->option('volatile')) {
$this->connectToStream(config('eventstore.volatile_streams'), function (EventStore $eventStore, string $stream) {
$this->processVolatileStream($eventStore, $stream);
});
}
}


private function connectToStream($streams, $callback): void
{
foreach ($streams as $stream) {
$eventStore = new EventStore();
$connection = $eventStore->connect(config('eventstore.tcp_url'));

$connection->subscribe(function () use ($eventStore, $stream) {
$this->processStream($eventStore, $stream);
$connection->subscribe(function () use ($eventStore, $stream, $callback) {
$callback($eventStore, $stream);
}, 'report');
}
}

private function processStream($eventStore, string $stream)
private function processPersistentStream($eventStore, string $stream): void
{
$eventStore
->persistentSubscription($stream, config('eventstore.group'))
->persistentSubscription($stream, config('eventstore.group'), $this->option('parallel') ?? 1)
->subscribe(function (AcknowledgeableEventRecord $event) {
$url = config('eventstore.http_url')."/streams/{$event->getStreamId()}/{$event->getNumber()}";
$this->info($url);

$this->info('[' . Carbon::now()->toDateTimeString() . '] [persistent] ' . config('eventstore.http_url') . '/streams/ ' . $event->getStreamId() . '/' . $event->getNumber());
try {
$this->dispatch($event);
$event->ack();
Expand All @@ -86,15 +102,36 @@ private function processStream($eventStore, string $stream)
'data' => $event->getData(),
'metadata' => $event->getMetadata(),
]);

$event->nack();
report($e);
}
}, 'report');
}

private function processVolatileStream($eventStore, string $stream): void
{
$eventStore
->volatileSubscription($stream)
->subscribe(function (EventRecord $event) {
$this->info('[' . Carbon::now()->toDateTimeString() . '] [volatile] ' . config('eventstore.http_url') . '/streams/ ' . $event->getStreamId() . '/' . $event->getNumber());
try {
$this->dispatch($event);
} catch (\Exception $e) {
dump([
'id' => $event->getId(),
'number' => $event->getNumber(),
'stream' => $event->getStreamId(),
'type' => $event->getType(),
'created' => $event->getCreated(),
'data' => $event->getData(),
'metadata' => $event->getMetadata(),
]);
report($e);
}
}, 'report');
}

public function dispatch(EventRecord $eventRecord)
public function dispatch(EventRecord $eventRecord): void
{
$event = $this->makeSerializableEvent($eventRecord);

Expand All @@ -106,7 +143,7 @@ public function dispatch(EventRecord $eventRecord)
}
}

protected function makeSerializableEvent(EventRecord $event)
private function makeSerializableEvent(EventRecord $event): JsonEventRecord
{
$data = new EventData();

Expand Down
6 changes: 2 additions & 4 deletions tests/WorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ class WorkerTest extends TestCase
{
use InteractsWithEventStore, MakesEventRecords;

/** @test */
public function it_dispatches_an_event_from_a_subscribed_event()
public function test_it_dispatches_an_event_from_a_subscribed_event()
{
// Arrange.
Event::fake();
Expand All @@ -32,8 +31,7 @@ public function it_dispatches_an_event_from_a_subscribed_event()
});
}

/** @test */
public function it_dispatches_a_classed_event_from_a_subscribed_event()
public function test_it_dispatches_a_classed_event_from_a_subscribed_event()
{
// Arrange.
Event::fake();
Expand Down

0 comments on commit a00aad9

Please sign in to comment.