Skip to content

Commit

Permalink
refactor: introduce subscribers
Browse files Browse the repository at this point in the history
  • Loading branch information
bpolaszek committed May 3, 2022
1 parent 37796a0 commit aeb3700
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 14 deletions.
10 changes: 7 additions & 3 deletions src/Hub/Controller/SubscribeController.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Freddie\Hub\HubControllerInterface;
use Freddie\Hub\HubInterface;
use Freddie\Message\Update;
use Freddie\Subscription\Subscriber;
use Lcobucci\JWT\UnencryptedToken;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
Expand Down Expand Up @@ -57,6 +58,8 @@ public function __invoke(
$allowedTopics = $this->extractAllowedTopics($request);
$lastEventId = extract_last_event_id($request);

$subscriber = new Subscriber($subscribedTopics);

if (null !== $lastEventId) {
async(
function () use ($lastEventId, $stream, $subscribedTopics, $allowedTopics) {
Expand All @@ -68,15 +71,16 @@ function () use ($lastEventId, $stream, $subscribedTopics, $allowedTopics) {
}

async(
function () use ($stream, $subscribedTopics, $allowedTopics) {
function () use ($stream, $subscribedTopics, $allowedTopics, $subscriber) {
$callback = fn(Update $update) => $this->sendUpdate(
$update,
$stream,
$subscribedTopics,
$allowedTopics
);
$this->hub->subscribe($callback);
$stream->on('close', fn() => $this->hub->unsubscribe($callback));
$subscriber->setCallback($callback);
$this->hub->subscribe($subscriber);
$stream->on('close', fn() => $this->hub->unsubscribe($subscriber));
}
)();

Expand Down
9 changes: 5 additions & 4 deletions src/Hub/Hub.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Freddie\Hub\Transport\PHP\PHPTransport;
use Freddie\Hub\Transport\TransportInterface;
use Freddie\Message\Update;
use Freddie\Subscription\Subscriber;
use Generator;
use InvalidArgumentException;
use React\EventLoop\Loop;
Expand Down Expand Up @@ -75,14 +76,14 @@ public function publish(Update $update): PromiseInterface
});
}

public function subscribe(callable $callback): void
public function subscribe(Subscriber $subscriber): void
{
$this->transport->subscribe($callback);
$this->transport->subscribe($subscriber);
}

public function unsubscribe(callable $callback): void
public function unsubscribe(Subscriber $subscriber): void
{
$this->transport->unsubscribe($callback);
$this->transport->unsubscribe($subscriber);
}

public function reconciliate(string $lastEventID): Generator
Expand Down
5 changes: 3 additions & 2 deletions src/Hub/HubInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Freddie\Hub;

use Freddie\Message\Update;
use Freddie\Subscription\Subscriber;
use Generator;
use React\Promise\PromiseInterface;

Expand All @@ -17,9 +18,9 @@ public function getOption(string $name): mixed;
*/
public function publish(Update $update): PromiseInterface;

public function subscribe(callable $callback): void;
public function subscribe(Subscriber $subscriber): void;

public function unsubscribe(callable $callback): void;
public function unsubscribe(Subscriber $subscriber): void;

/**
* @param string $lastEventID
Expand Down
3 changes: 3 additions & 0 deletions src/Message/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

namespace Freddie\Message;

use Symfony\Component\Uid\Ulid;

use function explode;
use function str_contains;

Expand All @@ -18,6 +20,7 @@ public function __construct(
public ?string $event = null,
public ?int $retry = null,
) {
$this->id ??= (string) new Ulid();
}

public function __toString(): string
Expand Down
9 changes: 8 additions & 1 deletion src/Message/Update.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,23 @@
use Freddie\Helper\TopicHelper;

use function Freddie\topic;
use function is_string;

final class Update
{
/**
* @var string[]
*/
public array $topics;

/**
* @param string[] $topics
*/
public function __construct(
public array $topics,
array|string $topics,
public Message $message,
) {
$this->topics = is_string($topics) ? [$topics] : $topics;
}

/**
Expand Down
50 changes: 50 additions & 0 deletions src/Subscription/Subscriber.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?php

declare(strict_types=1);

namespace Freddie\Subscription;

use Symfony\Component\Uid\Ulid;

use function array_map;

final class Subscriber
{
/**
* @var Subscription[]
*/
public readonly array $subscriptions;

/**
* @var callable
*/
private $callback;

/**
* @param string[] $topics
*/
public function __construct(
public readonly array $topics,
public readonly mixed $payload = null,
public readonly Ulid $id = new Ulid(),
public bool $active = true,
) {
$this->subscriptions = array_map(
fn(string $topic) => new Subscription($this, $topic),
$this->topics,
);
}

public function setCallback(callable $callback): void
{
$this->callback = $callback;
}

/**
* @param mixed ...$args
*/
public function __invoke(...$args): void
{
($this->callback)(...$args);
}
}
14 changes: 14 additions & 0 deletions src/Subscription/Subscription.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

declare(strict_types=1);

namespace Freddie\Subscription;

final class Subscription
{
public function __construct(
public readonly Subscriber $subscriber,
public readonly string $topic,
) {
}
}
11 changes: 7 additions & 4 deletions tests/Unit/Hub/HubTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Freddie\Hub\Transport\TransportInterface;
use Freddie\Message\Message;
use Freddie\Message\Update;
use Freddie\Subscription\Subscriber;
use Generator;
use InvalidArgumentException;
use React\Promise\PromiseInterface;
Expand Down Expand Up @@ -48,19 +49,21 @@ public function reconciliate(string $lastEventID): Generator
$hub = new Hub(transport: $transport);
$update = new Update(['foo'], new Message(Ulid::generate()));
$subscribeFn = fn () => 'bar';
$subscriber = new Subscriber(['foo']);
$subscriber->setCallback($subscribeFn);
$lastEventId = Ulid::generate();

// When
$hub->publish($update);
$hub->subscribe($subscribeFn);
$hub->subscribe($subscriber);
iterator_to_array($hub->reconciliate($lastEventId));
$hub->unsubscribe($subscribeFn);
$hub->unsubscribe($subscriber);

// Then
expect($transport->called['publish'])->toBe([$update]);
expect($transport->called['subscribe'])->toBe([$subscribeFn]);
expect($transport->called['subscribe'])->toBe([$subscriber]);
expect($transport->called['reconciliate'])->toBe([$lastEventId]);
expect($transport->called['unsubscribe'])->toBe([$subscribeFn]);
expect($transport->called['unsubscribe'])->toBe([$subscriber]);
});

it('complains when requesting an unrecognized option', function () {
Expand Down
7 changes: 7 additions & 0 deletions tests/Unit/Message/MessageTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Freddie\Tests\Unit\Message;

use Freddie\Message\Message;
use Symfony\Component\Uid\Ulid;

it('stringifies messages', function (Message $message, string $expected) {
expect((string) $message)->toBe($expected);
Expand Down Expand Up @@ -34,3 +35,9 @@
"id:1\nevent:message\ndata:foo\ndata:bar\n\n",
];
});

it('has a default id', function () {
$message = new Message();
expect($message->id)->not()->toBeNull();
expect(Ulid::isValid($message->id))->toBeTrue();
});
5 changes: 5 additions & 0 deletions tests/Unit/Message/UpdateTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,8 @@
'expected' => true, // Private update allowed on all
];
});

it('accepts a single topic', function () {
$update = new Update('/foo', new Message());
expect($update->topics)->toBe(['/foo']);
});

0 comments on commit aeb3700

Please sign in to comment.